diff --git a/.gitignore b/.gitignore index c5d92c3..3140174 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*~ target/ pom.xml.tag pom.xml.releaseBackup diff --git a/README.md b/README.md index e291947..d658d2c 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,11 @@ The MySQLSleepBasedCondition is based on the MySQL ``SLEEP()`` and ``KILL QUERY` The thread that is woken up is guaranteed to be the one that has waited the longest. +TODO: Try other MySQL features for locking that may perform better than SLEEP(): +- http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock +- https://dev.mysql.com/doc/internals/en/debug-sync-facility.html +- https://dev.mysql.com/doc/internals/en/wait-condition.html + Queue ----- diff --git a/pom.xml b/pom.xml index efbdd12..a2779b4 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,9 @@ jar + + -Xdoclint:none + diff --git a/src/main/java/net/bramp/concurrent/ConcurrentBitSet.java b/src/main/java/net/bramp/concurrent/ConcurrentBitSet.java index 28805e4..e38fc0e 100644 --- a/src/main/java/net/bramp/concurrent/ConcurrentBitSet.java +++ b/src/main/java/net/bramp/concurrent/ConcurrentBitSet.java @@ -6,168 +6,175 @@ /** * Simple wrapper around BitSet to make it thread safe + * * @author bramp * */ public class ConcurrentBitSet { - final BitSet set; - - public ConcurrentBitSet() { - set = new BitSet(); - } - - public ConcurrentBitSet(int nbits) { - set = new BitSet(nbits); - } - - /** - * Atomically sets the value to the given updated value if the current value == the expected value. - * - * @param bitIndex - * @param expect - * @param update - * @return true if successful. False return indicates that the actual value was not equal to the expected value. - */ - public synchronized boolean compareAndSet(int bitIndex, boolean expect, boolean update) { - if (get(bitIndex) == expect) { - set(bitIndex, update); - return true; - } - return false; - } - - /** - * Return a set of clear indexes - * @return - */ - public Set getClearBits(int max) { - Set missing = new TreeSet(); - int nextBit = -1; - while (true) { - nextBit = this.nextClearBit(nextBit + 1); - if (nextBit >= max) - break; - missing.add(nextBit); - } - return missing; - } - - public synchronized byte[] toByteArray() { - return set.toByteArray(); - } - - public synchronized long[] toLongArray() { - return set.toLongArray(); - } - - public synchronized void flip(int bitIndex) { - set.flip(bitIndex); - } - - public synchronized void flip(int fromIndex, int toIndex) { - set.flip(fromIndex, toIndex); - } - - public synchronized void set(int bitIndex) { - set.set(bitIndex); - } - - public synchronized void set(int bitIndex, boolean value) { - set.set(bitIndex, value); - } - - public synchronized void set(int fromIndex, int toIndex) { - set.set(fromIndex, toIndex); - } - - public synchronized void set(int fromIndex, int toIndex, boolean value) { - set.set(fromIndex, toIndex, value); - } - - public synchronized void clear(int bitIndex) { - set.clear(bitIndex); - } - - public synchronized void clear(int fromIndex, int toIndex) { - set.clear(fromIndex, toIndex); - } - - public synchronized void clear() { - set.clear(); - } - - public synchronized boolean get(int bitIndex) { - return set.get(bitIndex); - } - - public synchronized BitSet get(int fromIndex, int toIndex) { - return set.get(fromIndex, toIndex); - } - - public synchronized int nextSetBit(int fromIndex) { - return set.nextSetBit(fromIndex); - } - - public synchronized int nextClearBit(int fromIndex) { - return set.nextClearBit(fromIndex); - } - - public synchronized int previousSetBit(int fromIndex) { - return set.previousSetBit(fromIndex); - } - - public synchronized int previousClearBit(int fromIndex) { - return set.previousClearBit(fromIndex); - } - - public synchronized int length() { - return set.length(); - } - - public synchronized boolean isEmpty() { - return set.isEmpty(); - } - - public synchronized boolean intersects(BitSet set) { - return set.intersects(set); - } - - public synchronized int cardinality() { - return set.cardinality(); - } - - public synchronized void and(BitSet set) { - set.and(set); - } - - public synchronized void or(BitSet set) { - set.or(set); - } - - public synchronized void xor(BitSet set) { - set.xor(set); - } - - public synchronized void andNot(BitSet set) { - set.andNot(set); - } - public synchronized int hashCode() { - return set.hashCode(); - } + final BitSet set; + + public ConcurrentBitSet() { + set = new BitSet(); + } + + public ConcurrentBitSet(int nbits) { + set = new BitSet(nbits); + } + + /** + * Atomically sets the value to the given updated value if the current value + * == the expected value. + * + * @param bitIndex + * @param expect + * @param update + * @return true if successful. False return indicates that the actual value + * was not equal to the expected value. + */ + public synchronized boolean compareAndSet(int bitIndex, boolean expect, boolean update) { + if (get(bitIndex) == expect) { + set(bitIndex, update); + return true; + } + return false; + } + + /** + * Return a set of clear indexes + * + * @return + */ + public Set getClearBits(int max) { + Set missing = new TreeSet(); + int nextBit = -1; + while (true) { + nextBit = this.nextClearBit(nextBit + 1); + if (nextBit >= max) { + break; + } + missing.add(nextBit); + } + return missing; + } + + public synchronized byte[] toByteArray() { + return set.toByteArray(); + } + + public synchronized long[] toLongArray() { + return set.toLongArray(); + } + + public synchronized void flip(int bitIndex) { + set.flip(bitIndex); + } + + public synchronized void flip(int fromIndex, int toIndex) { + set.flip(fromIndex, toIndex); + } + + public synchronized void set(int bitIndex) { + set.set(bitIndex); + } + + public synchronized void set(int bitIndex, boolean value) { + set.set(bitIndex, value); + } + + public synchronized void set(int fromIndex, int toIndex) { + set.set(fromIndex, toIndex); + } + + public synchronized void set(int fromIndex, int toIndex, boolean value) { + set.set(fromIndex, toIndex, value); + } + + public synchronized void clear(int bitIndex) { + set.clear(bitIndex); + } + + public synchronized void clear(int fromIndex, int toIndex) { + set.clear(fromIndex, toIndex); + } + + public synchronized void clear() { + set.clear(); + } + + public synchronized boolean get(int bitIndex) { + return set.get(bitIndex); + } + + public synchronized BitSet get(int fromIndex, int toIndex) { + return set.get(fromIndex, toIndex); + } + + public synchronized int nextSetBit(int fromIndex) { + return set.nextSetBit(fromIndex); + } + + public synchronized int nextClearBit(int fromIndex) { + return set.nextClearBit(fromIndex); + } + + public synchronized int previousSetBit(int fromIndex) { + return set.previousSetBit(fromIndex); + } + + public synchronized int previousClearBit(int fromIndex) { + return set.previousClearBit(fromIndex); + } + + public synchronized int length() { + return set.length(); + } + + public synchronized boolean isEmpty() { + return set.isEmpty(); + } + + public synchronized boolean intersects(BitSet set) { + return set.intersects(set); + } + + public synchronized int cardinality() { + return set.cardinality(); + } + + public synchronized void and(BitSet set) { + set.and(set); + } + + public synchronized void or(BitSet set) { + set.or(set); + } + + public synchronized void xor(BitSet set) { + set.xor(set); + } + + public synchronized void andNot(BitSet set) { + set.andNot(set); + } - public synchronized int size() { - return set.size(); - } + public synchronized int hashCode() { + return set.hashCode(); + } - public synchronized boolean equals(Object obj) { - if (obj instanceof ConcurrentBitSet) - return set.equals( ((ConcurrentBitSet)obj).set ); + public synchronized int size() { + return set.size(); + } - return false; - } + public synchronized boolean equals(Object obj) { + if (obj instanceof ConcurrentBitSet) { + return set.equals(((ConcurrentBitSet) obj).set); + } + + return false; + } - public synchronized String toString() { - return set.toString(); - } -} \ No newline at end of file + public synchronized String toString() { + return set.toString(); + } +} diff --git a/src/main/java/net/bramp/concurrent/Futures.java b/src/main/java/net/bramp/concurrent/Futures.java index 5d910e4..036d2c4 100644 --- a/src/main/java/net/bramp/concurrent/Futures.java +++ b/src/main/java/net/bramp/concurrent/Futures.java @@ -12,93 +12,99 @@ import java.util.concurrent.TimeoutException; public final class Futures { - private Futures() {} - - /** - * Calls get() on all futures waiting for responses If any future throws an - * Exception, this method throws it (without returning the list of - * successful responses) - * - * @param futures - * @return - * @throws InterruptedException - * @throws ExecutionException - */ - public static List getAll(List> futures) - throws InterruptedException, ExecutionException { - List results = new ArrayList(futures.size()); - for (Future future : futures) { - results.add(future.get()); - } - return Collections.unmodifiableList(results); - } - - /** - * Calls get() on all futures waiting for responses until the timeout If any - * future throws an Exception, this method throws it (without returning the - * list of successful responses) - * - * @param futures - * @param timeout - * @param unit - * @return - * @throws InterruptedException - * @throws ExecutionException - * @throws TimeoutException - */ - @SuppressWarnings("unchecked") - public static List getAll(List> futures, long timeout, - TimeUnit unit) throws InterruptedException, ExecutionException, - TimeoutException { - - long timeoutNS = unit.toNanos(timeout); - long deadline = System.nanoTime() + timeoutNS; - - // Make a copy - futures = new LinkedList>(futures); - - final int size = futures.size(); - BitSet done = new BitSet(size); - - V[] results = (V[]) new Object[size]; - while (done.cardinality() < size) { - - for (int i = 0; i < size; i++) { - // TODO - We could use done.nextClearBit(i) - if (done.get(i)) - continue; - - Future future = futures.get(i); - try { - // We wait just a fraction, to give everyone at least two chances - results[i] = future.get(timeoutNS / (2 * size), TimeUnit.NANOSECONDS); - done.set(i); - - } catch (ExecutionException e) { - unwrapExecutionException(e); - - } catch (TimeoutException e) { - // If we have exceeded our deadline, throw, otherwise - if (System.nanoTime() >= deadline) - throw e; - } - } - } - - return Collections.unmodifiableList(Arrays.asList(results)); - } - - /** - * Sometimes InterruptedException is wrapped in an ExecutionException - * I don't think that's correct behavior, but lets fix it here - * @param e - * @throws InterruptedException - * @throws ExecutionException - */ - public static void unwrapExecutionException(ExecutionException e) throws InterruptedException, ExecutionException { - if (e.getCause() instanceof InterruptedException) - throw (InterruptedException)e.getCause(); - - throw e; - } + + private Futures() { + } + + /** + * Calls get() on all futures waiting for responses If any future throws an + * Exception, this method throws it (without returning the list of + * successful responses) + * + * @param futures + * @return + * @throws InterruptedException + * @throws ExecutionException + */ + public static List getAll(List> futures) + throws InterruptedException, ExecutionException { + List results = new ArrayList(futures.size()); + for (Future future : futures) { + results.add(future.get()); + } + return Collections.unmodifiableList(results); + } + + /** + * Calls get() on all futures waiting for responses until the timeout If any + * future throws an Exception, this method throws it (without returning the + * list of successful responses) + * + * @param futures + * @param timeout + * @param unit + * @return + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + @SuppressWarnings("unchecked") + public static List getAll(List> futures, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException { + + long timeoutNS = unit.toNanos(timeout); + long deadline = System.nanoTime() + timeoutNS; + + // Make a copy + futures = new LinkedList>(futures); + + final int size = futures.size(); + BitSet done = new BitSet(size); + + V[] results = (V[]) new Object[size]; + while (done.cardinality() < size) { + + for (int i = 0; i < size; i++) { + // TODO - We could use done.nextClearBit(i) + if (done.get(i)) { + continue; + } + + Future future = futures.get(i); + try { + // We wait just a fraction, to give everyone at least two chances + results[i] = future.get(timeoutNS / (2 * size), TimeUnit.NANOSECONDS); + done.set(i); + + } catch (ExecutionException e) { + unwrapExecutionException(e); + + } catch (TimeoutException e) { + // If we have exceeded our deadline, throw, otherwise + if (System.nanoTime() >= deadline) { + throw e; + } + } + } + } + + return Collections.unmodifiableList(Arrays.asList(results)); + } + + /** + * Sometimes InterruptedException is wrapped in an ExecutionException I + * don't think that's correct behavior, but lets fix it here + * + * @param e + * @throws InterruptedException + * @throws ExecutionException + */ + public static void unwrapExecutionException(ExecutionException e) throws InterruptedException, ExecutionException { + if (e.getCause() instanceof InterruptedException) { + throw (InterruptedException) e.getCause(); + } + + throw e; + } } diff --git a/src/main/java/net/bramp/db_patterns/locks/MySQLLockBasedCondition.java b/src/main/java/net/bramp/db_patterns/locks/MySQLLockBasedCondition.java new file mode 100644 index 0000000..08c3d32 --- /dev/null +++ b/src/main/java/net/bramp/db_patterns/locks/MySQLLockBasedCondition.java @@ -0,0 +1,240 @@ +package net.bramp.db_patterns.locks; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + +import javax.annotation.Nonnull; +import javax.sql.DataSource; + +import net.bramp.sql.ResultSetFilter; +import net.bramp.sql.ResultSets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses the MySQL GET_LOCK() / RELEASE_LOCK() to implement a distributed Condition. + * For use only with MySQL version 5.7.5 and up, where multiple simultaneous locks + * can be acquired and GET_LOCK() does not release any existing locks. + * @author xgp + */ +public class MySQLLockBasedCondition implements Condition { + + final static Logger LOG = LoggerFactory.getLogger(MySQLLockBasedCondition.class); + + static long DEFAULT_WAIT = 60000000000L; + + final static String sleepQuery = "SELECT SLEEP(?), ?;"; + final static String wakeQuery = "KILL QUERY ?;"; + + final static String listQueryNew + = // MySQL 5.1.7 or newer + "SELECT Id, User, Host, Db, Command, Time, State, Info FROM " + + "INFORMATION_SCHEMA.PROCESSLIST " + + "WHERE STATE = 'User sleep' AND INFO LIKE ? " + + "ORDER BY TIME"; + + final static String listQueryOld = "SHOW PROCESSLIST;"; + + final boolean useListQueryNew = false; + + final DataSource ds; + final String lockName; + + final ResultSetFilter.Predicate isOurLockPredicate = new ResultSetFilter.Predicate() { + + public boolean apply(ResultSet rs) throws SQLException { + if (LOG.isDebugEnabled()) { + LOG.debug("ResultSet {}", ResultSets.toString(rs)); + } + + String state = rs.getString(7); + if (state != null && !state.equals("User sleep")) { + return false; + } + + String info = rs.getString(8); + return info != null && info.matches("SELECT SLEEP\\([\\d.]+\\), '" + lockName + "'"); + } + }; + + public MySQLLockBasedCondition(@Nonnull DataSource ds, @Nonnull String lockName) { + this.ds = ds; + this.lockName = lockName; + + // TODO Detect MySQL version (update useListQueryNew) + // TODO Detect if we can sleep/kill + } + + /** + * @param nanosTimeout The number of nanoseconds to wait + * @return true if awaken (correctly, or spuriously), false if timeout + * @throws InterruptedException + */ + protected boolean awaitNanosInternal(long nanosTimeout) throws InterruptedException { + if (nanosTimeout <= 0) { + return false; + } + + long now = System.nanoTime(); + + try { + Connection c = ds.getConnection(); + try { + PreparedStatement s = c.prepareStatement(sleepQuery); + try { + + // Adjust nanosTimeout (due to time it took to get a connection) + nanosTimeout -= (System.nanoTime() - now); + + // Convert to seconds, but round to whole number of milliseconds + float timeout = Math.round(nanosTimeout / 1000000.0) / 1000f; + if (timeout <= 0) { + LOG.trace("After connection acquired, timeout was negative or zero: {}", timeout); + return false; + } + + s.setFloat(1, timeout); + s.setString(2, lockName); + s.execute(); + + ResultSet rs = s.getResultSet(); + if (rs != null && rs.next()) { + return rs.getInt(1) == 1; + } + + return true; + + } finally { + s.close(); + } + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override public long awaitNanos(long nanosTimeout) throws InterruptedException { + long now = System.nanoTime(); + awaitNanosInternal(nanosTimeout); + return System.nanoTime() - now; + } + + @Override public void await() throws InterruptedException { + while (!awaitNanosInternal(DEFAULT_WAIT)) { + // Keep looping, until we expire before our timeout or are interuptted + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // TODO There is a race condition here. Between iterations we might miss a wakeup + } + } + + @Override public void awaitUninterruptibly() { + while (true) { + try { + await(); + break; + } catch (InterruptedException e) { + } + } + } + + @Override public boolean await(long time, TimeUnit unit) throws InterruptedException { + return awaitNanosInternal(unit.toNanos(time)); + } + + @Override public boolean awaitUntil(Date deadline) throws InterruptedException { + long duration = deadline.getTime() - System.currentTimeMillis(); + return awaitNanosInternal(TimeUnit.MILLISECONDS.toNanos(duration)); + } + + /** + * Get a list of the other threads waiting + * + * @throws SQLException + */ + protected ResultSet findLockThreads(@Nonnull Connection c) throws SQLException { + PreparedStatement s = null; + + if (useListQueryNew) { + s = c.prepareStatement(listQueryNew); + s.setString(1, "SELECT SLEEP(%" + lockName + "%"); + } else { + s = c.prepareStatement(listQueryOld); + } + return new ResultSetFilter(s.executeQuery(), isOurLockPredicate); + } + + protected void killThread(@Nonnull Connection c, long threadId) throws SQLException { + LOG.debug("Killing thread {}", threadId); + + PreparedStatement s = c.prepareStatement(wakeQuery); + s.setLong(1, threadId); + s.execute(); + } + + /** + * Will signal the thread that's been waiting the longest + */ + @Override public void signal() { + try { + Connection c = ds.getConnection(); + try { + // Find a list of blocked threads to wake up + ResultSet threads = findLockThreads(c); + if (!threads.next()) { + LOG.debug("Nothing to wake up for '{}'", lockName); + return; + } + long toWake = threads.getLong(1); + threads.close(); + + killThread(c, toWake); + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override public void signalAll() { + try { + Connection c = ds.getConnection(); + try { + // Find a list of blocked threads to wake up + List toWake = new ArrayList(); + ResultSet threads = findLockThreads(c); + while (threads.next()) { + toWake.add(threads.getLong(1)); + } + threads.close(); + + for (Long id : toWake) { + killThread(c, id); + } + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/net/bramp/db_patterns/locks/MySQLSleepBasedCondition.java b/src/main/java/net/bramp/db_patterns/locks/MySQLSleepBasedCondition.java index f366930..46143e0 100644 --- a/src/main/java/net/bramp/db_patterns/locks/MySQLSleepBasedCondition.java +++ b/src/main/java/net/bramp/db_patterns/locks/MySQLSleepBasedCondition.java @@ -26,203 +26,214 @@ */ public class MySQLSleepBasedCondition implements Condition { - final static Logger LOG = LoggerFactory.getLogger(MySQLSleepBasedCondition.class); - - static long DEFAULT_WAIT = 60000000000L; - - final static String sleepQuery = "SELECT SLEEP(?), ?;"; - final static String wakeQuery = "KILL QUERY ?;"; - - final static String listQueryNew = // MySQL 5.1.7 or newer - "SELECT Id, User, Host, Db, Command, Time, State, Info FROM " + - "INFORMATION_SCHEMA.PROCESSLIST " + - "WHERE STATE = 'User sleep' AND INFO LIKE ? " + - "ORDER BY TIME"; - - final static String listQueryOld = "SHOW PROCESSLIST;"; - - final boolean useListQueryNew = false; - - final DataSource ds; - final String lockName; - - final ResultSetFilter.Predicate isOurLockPredicate = new ResultSetFilter.Predicate() { - - public boolean apply(ResultSet rs) throws SQLException { - if (LOG.isDebugEnabled()) { - LOG.debug("ResultSet {}", ResultSets.toString(rs)); - } - - String state = rs.getString(7); - if (state != null && !state.equals("User sleep")) - return false; - - String info = rs.getString(8); - return info != null && info.matches("SELECT SLEEP\\([\\d.]+\\), '" + lockName + "'"); - } - }; - - public MySQLSleepBasedCondition(@Nonnull DataSource ds, @Nonnull String lockName) { - this.ds = ds; - this.lockName = lockName; - - // TODO Detect MySQL version (update useListQueryNew) - // TODO Detect if we can sleep/kill - } - - /** - * @param nanosTimeout The number of nanoseconds to wait - * @return true if awaken (correctly, or spuriously), false if timeout - * @throws InterruptedException - */ - protected boolean awaitNanosInternal(long nanosTimeout) throws InterruptedException { - if (nanosTimeout <= 0) - return false; - - long now = System.nanoTime(); - - try { - Connection c = ds.getConnection(); - try { - PreparedStatement s = c.prepareStatement(sleepQuery); - try { - - // Adjust nanosTimeout (due to time it took to get a connection) - nanosTimeout -= (System.nanoTime() - now); - - // Convert to seconds, but round to whole number of milliseconds - s.setFloat(1, Math.round(nanosTimeout / 1000000.0) / 1000f); - s.setString(2, lockName); - s.execute(); - - ResultSet rs = s.getResultSet(); - if (rs != null && rs.next()) - return rs.getInt(1) == 1; - - return true; - - } finally { - s.close(); - } - - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public long awaitNanos(long nanosTimeout) throws InterruptedException { - long now = System.nanoTime(); - awaitNanosInternal(nanosTimeout); - return System.nanoTime() - now; - } - - public void await() throws InterruptedException { - while (!awaitNanosInternal(DEFAULT_WAIT)) { - // Keep looping, until we expire before our timeout or are interuptted - if (Thread.interrupted()) - throw new InterruptedException(); - - // TODO There is a race condition here. Between iterations we might miss a wakeup - } - } - - public void awaitUninterruptibly() { - while (true) { - try { - await(); - break; - } catch (InterruptedException e) { - } - } - } - - public boolean await(long time, TimeUnit unit) throws InterruptedException { - return awaitNanosInternal(unit.toNanos(time)); - } - - public boolean awaitUntil(Date deadline) throws InterruptedException { - long duration = deadline.getTime() - System.currentTimeMillis(); - return awaitNanosInternal(TimeUnit.MILLISECONDS.toNanos(duration)); - } - - /** - * Get a list of the other threads waiting - * - * @throws SQLException - */ - protected ResultSet findLockThreads(@Nonnull Connection c) throws SQLException { - PreparedStatement s = null; - - if (useListQueryNew) { - s = c.prepareStatement(listQueryNew); - s.setString(1, "SELECT SLEEP(%" + lockName + "%"); - } else { - s = c.prepareStatement(listQueryOld); - } - return new ResultSetFilter(s.executeQuery(), isOurLockPredicate); - } - - protected void killThread(@Nonnull Connection c, long threadId) throws SQLException { - LOG.debug("Killing thread {}", threadId); - - PreparedStatement s = c.prepareStatement(wakeQuery); - s.setLong(1, threadId); - s.execute(); - } - - /** - * Will signal the thread that's been waiting the longest - */ - public void signal() { - try { - Connection c = ds.getConnection(); - try { - // Find a list of blocked threads to wake up - ResultSet threads = findLockThreads(c); - if (!threads.next()) { - LOG.debug("Nothing to wake up for '{}'", lockName); - return; - } - long toWake = threads.getLong(1); - threads.close(); - - killThread(c, toWake); - - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public void signalAll() { - try { - Connection c = ds.getConnection(); - try { - // Find a list of blocked threads to wake up - List toWake = new ArrayList(); - ResultSet threads = findLockThreads(c); - while (threads.next()) { - toWake.add(threads.getLong(1)); - } - threads.close(); - - for (Long id : toWake) { - killThread(c, id); - } - - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } + final static Logger LOG = LoggerFactory.getLogger(MySQLSleepBasedCondition.class); + + static long DEFAULT_WAIT = 60000000000L; + + final static String sleepQuery = "SELECT SLEEP(?), ?;"; + final static String wakeQuery = "KILL QUERY ?;"; + + final static String listQueryNew + = // MySQL 5.1.7 or newer + "SELECT Id, User, Host, Db, Command, Time, State, Info FROM " + + "INFORMATION_SCHEMA.PROCESSLIST " + + "WHERE STATE = 'User sleep' AND INFO LIKE ? " + + "ORDER BY TIME"; + + final static String listQueryOld = "SHOW PROCESSLIST;"; + + final boolean useListQueryNew = false; + + final DataSource ds; + final String lockName; + + final ResultSetFilter.Predicate isOurLockPredicate = new ResultSetFilter.Predicate() { + + public boolean apply(ResultSet rs) throws SQLException { + if (LOG.isDebugEnabled()) { + LOG.debug("ResultSet {}", ResultSets.toString(rs)); + } + + String state = rs.getString(7); + if (state != null && !state.equals("User sleep")) { + return false; + } + + String info = rs.getString(8); + return info != null && info.matches("SELECT SLEEP\\([\\d.]+\\), '" + lockName + "'"); + } + }; + + public MySQLSleepBasedCondition(@Nonnull DataSource ds, @Nonnull String lockName) { + this.ds = ds; + this.lockName = lockName; + + // TODO Detect MySQL version (update useListQueryNew) + // TODO Detect if we can sleep/kill + } + + /** + * @param nanosTimeout The number of nanoseconds to wait + * @return true if awaken (correctly, or spuriously), false if timeout + * @throws InterruptedException + */ + protected boolean awaitNanosInternal(long nanosTimeout) throws InterruptedException { + if (nanosTimeout <= 0) { + return false; + } + + long now = System.nanoTime(); + + try { + Connection c = ds.getConnection(); + try { + PreparedStatement s = c.prepareStatement(sleepQuery); + try { + + // Adjust nanosTimeout (due to time it took to get a connection) + nanosTimeout -= (System.nanoTime() - now); + + // Convert to seconds, but round to whole number of milliseconds + float timeout = Math.round(nanosTimeout / 1000000.0) / 1000f; + if (timeout <= 0) { + LOG.trace("After connection acquired, timeout was negative or zero: {}", timeout); + return false; + } + + s.setFloat(1, timeout); + s.setString(2, lockName); + s.execute(); + + ResultSet rs = s.getResultSet(); + if (rs != null && rs.next()) { + return rs.getInt(1) == 1; + } + + return true; + + } finally { + s.close(); + } + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public long awaitNanos(long nanosTimeout) throws InterruptedException { + long now = System.nanoTime(); + awaitNanosInternal(nanosTimeout); + return System.nanoTime() - now; + } + + public void await() throws InterruptedException { + while (!awaitNanosInternal(DEFAULT_WAIT)) { + // Keep looping, until we expire before our timeout or are interuptted + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // TODO There is a race condition here. Between iterations we might miss a wakeup + } + } + + public void awaitUninterruptibly() { + while (true) { + try { + await(); + break; + } catch (InterruptedException e) { + } + } + } + + public boolean await(long time, TimeUnit unit) throws InterruptedException { + return awaitNanosInternal(unit.toNanos(time)); + } + + public boolean awaitUntil(Date deadline) throws InterruptedException { + long duration = deadline.getTime() - System.currentTimeMillis(); + return awaitNanosInternal(TimeUnit.MILLISECONDS.toNanos(duration)); + } + + /** + * Get a list of the other threads waiting + * + * @throws SQLException + */ + protected ResultSet findLockThreads(@Nonnull Connection c) throws SQLException { + PreparedStatement s = null; + + if (useListQueryNew) { + s = c.prepareStatement(listQueryNew); + s.setString(1, "SELECT SLEEP(%" + lockName + "%"); + } else { + s = c.prepareStatement(listQueryOld); + } + return new ResultSetFilter(s.executeQuery(), isOurLockPredicate); + } + + protected void killThread(@Nonnull Connection c, long threadId) throws SQLException { + LOG.debug("Killing thread {}", threadId); + + PreparedStatement s = c.prepareStatement(wakeQuery); + s.setLong(1, threadId); + s.execute(); + } + + /** + * Will signal the thread that's been waiting the longest + */ + public void signal() { + try { + Connection c = ds.getConnection(); + try { + // Find a list of blocked threads to wake up + ResultSet threads = findLockThreads(c); + if (!threads.next()) { + LOG.debug("Nothing to wake up for '{}'", lockName); + return; + } + long toWake = threads.getLong(1); + threads.close(); + + killThread(c, toWake); + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public void signalAll() { + try { + Connection c = ds.getConnection(); + try { + // Find a list of blocked threads to wake up + List toWake = new ArrayList(); + ResultSet threads = findLockThreads(c); + while (threads.next()) { + toWake.add(threads.getLong(1)); + } + threads.close(); + + for (Long id : toWake) { + killThread(c, id); + } + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/net/bramp/db_patterns/queues/AbstractBlockingQueue.java b/src/main/java/net/bramp/db_patterns/queues/AbstractBlockingQueue.java index 60c942e..b57a1fc 100644 --- a/src/main/java/net/bramp/db_patterns/queues/AbstractBlockingQueue.java +++ b/src/main/java/net/bramp/db_patterns/queues/AbstractBlockingQueue.java @@ -8,133 +8,139 @@ /** * To keep code neat, most of the simple methods are here + * * @author bramp * */ abstract class AbstractBlockingQueue implements BlockingQueue { - public boolean isEmpty() { - return size() == 0; - } - - public boolean offer(E e) { - return add(e); - } - - public E element() { - E head = peek(); - if (head == null) - throw new NoSuchElementException(); - return head; - } - - public E remove() { - E head = poll(); - if (head == null) - throw new NoSuchElementException(); - return head; - } - - /** - * Blocks - */ - public E take() throws InterruptedException { - // We loop around trying to get a item, blocking at most a minute at - // a time this allows us to be interrupted - E head = null; - while (head == null) { - if (Thread.interrupted()) - throw new InterruptedException(); - - head = poll(1, TimeUnit.MINUTES); - } - return head; - } - - /** - * Blocks - */ - public void put(E e) throws InterruptedException { - add(e); - } - - /** - * No blocking - */ - public int drainTo(Collection c) { - return drainTo(c, Integer.MAX_VALUE); - } - - /** - * No blocking - */ - public int drainTo(Collection c, int maxElements) { - if (c == this) - throw new IllegalArgumentException("Draining to self is not supported"); - - int count = 0; - while (count < maxElements) { - E head = poll(); - if (head == null) - break; - - c.add(head); - count++; - } - - return maxElements - count; - } - - public void clear() { - // Lazy! just keep poll'ng them off - while (poll() != null) { - // Nothing - } - } - - public int remainingCapacity() { - return Integer.MAX_VALUE; - } - - public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - // Right now, we have no concept of a full queue, so we don't block on insert - return offer(e); - } - - ////// Nothing supported below - - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - public Object[] toArray() { - throw new UnsupportedOperationException(); } - - public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); - } - - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } + public boolean isEmpty() { + return size() == 0; + } + + public boolean offer(E e) { + return add(e); + } + + public E element() { + E head = peek(); + if (head == null) { + throw new NoSuchElementException(); + } + return head; + } + + public E remove() { + E head = poll(); + if (head == null) { + throw new NoSuchElementException(); + } + return head; + } + + /** + * Blocks + */ + public E take() throws InterruptedException { + // We loop around trying to get a item, blocking at most a minute at + // a time this allows us to be interrupted + E head = null; + while (head == null) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + head = poll(1, TimeUnit.MINUTES); + } + return head; + } + + /** + * Blocks + */ + public void put(E e) throws InterruptedException { + add(e); + } + + /** + * No blocking + */ + public int drainTo(Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + /** + * No blocking + */ + public int drainTo(Collection c, int maxElements) { + if (c == this) { + throw new IllegalArgumentException("Draining to self is not supported"); + } + + int count = 0; + while (count < maxElements) { + E head = poll(); + if (head == null) { + break; + } + + c.add(head); + count++; + } + + return maxElements - count; + } + + public void clear() { + // Lazy! just keep poll'ng them off + while (poll() != null) { + // Nothing + } + } + + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + // Right now, we have no concept of a full queue, so we don't block on insert + return offer(e); + } + + ////// Nothing supported below + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } } diff --git a/src/main/java/net/bramp/db_patterns/queues/MySQLBasedQueue.java b/src/main/java/net/bramp/db_patterns/queues/MySQLBasedQueue.java index b9b417f..3e34764 100644 --- a/src/main/java/net/bramp/db_patterns/queues/MySQLBasedQueue.java +++ b/src/main/java/net/bramp/db_patterns/queues/MySQLBasedQueue.java @@ -19,16 +19,12 @@ /** * A queue backed by MySQL *

- * CREATE TABLE queue ( - * id INT UNSIGNED NOT NULL AUTO_INCREMENT, - * queue_name VARCHAR(255) NOT NULL, -- Queue name - * inserted TIMESTAMP NOT NULL, -- Time the row was inserted - * inserted_by VARCHAR(255) NOT NULL, -- and by who - * acquired TIMESTAMP NULL, -- Time the row was acquired - * acquired_by VARCHAR(255) NULL, -- and by who - * value BLOB NOT NULL, -- The actual data - * PRIMARY KEY (id) - * ) ENGINE=INNODB DEFAULT CHARSET=UTF8; + * CREATE TABLE queue ( id INT UNSIGNED NOT NULL AUTO_INCREMENT, queue_name + * VARCHAR(255) NOT NULL, -- Queue name inserted TIMESTAMP NOT NULL, -- Time the + * row was inserted inserted_by VARCHAR(255) NOT NULL, -- and by who acquired + * TIMESTAMP NULL, -- Time the row was acquired acquired_by VARCHAR(255) NULL, + * -- and by who value BLOB NOT NULL, -- The actual data PRIMARY KEY (id) ) + * ENGINE=INNODB DEFAULT CHARSET=UTF8; *

* TODO Create efficient drainTo * @@ -37,240 +33,240 @@ */ public class MySQLBasedQueue extends AbstractBlockingQueue { - final static Logger LOG = LoggerFactory.getLogger(MySQLBasedQueue.class); - - final static String addQuery = "INSERT INTO queue (queue_name, inserted, inserted_by, value) values (?, now(), ?, ?)"; - final static String peekQuery = "SELECT value FROM queue WHERE acquired IS NULL AND queue_name = ? ORDER BY id ASC LIMIT 1"; - final static String sizeQuery = "SELECT COUNT(*) FROM queue WHERE acquired IS NULL AND queue_name = ?"; - - /** - * Claims one row (and keeps it in the database) - */ - final static String pollQuery[] = { - "SET @update_id := -1; ", - - "UPDATE queue SET " + - " id = (SELECT @update_id := id), " + - " acquired = NOW(), " + - " acquired_by = ? " + - "WHERE acquired IS NULL AND queue_name = ? " + - "ORDER BY id ASC " + - "LIMIT 1; ", - - "SELECT value FROM queue WHERE id = @update_id" - }; - - final static String cleanupQuery = - "DELETE FROM queue " + - "WHERE acquired IS NOT NULL " + - " AND queue_name = ? " + - " AND acquired < DATE_SUB(NOW(), INTERVAL 10 DAY)"; - - final static String cleanupAllQuery = - "DELETE FROM queue " + - "WHERE acquired IS NOT NULL " + - " AND acquired < DATE_SUB(NOW(), INTERVAL 10 DAY)"; - - final String me; - - final DataSource ds; - final String queueName; - final Class type; - - final Condition condition; - - /** - * Creates a new MySQL backed queue - * - * @param ds - * @param queueName - * @param type - * @param me The name of this node, for storing in the database table - */ - public MySQLBasedQueue(DataSource ds, String queueName, Class type, String me) { - this.ds = ds; - this.queueName = queueName; - this.type = type; - this.condition = new MySQLSleepBasedCondition(ds, "queue-" + queueName); - this.me = me; - } - - public boolean add(E value) { - try { - Connection c = ds.getConnection(); - try { - PreparedStatement s = c.prepareStatement(addQuery); - try { - s.setString(1, queueName); - s.setObject(2, me); // Inserted by me - s.setObject(3, value); - s.execute(); - - // Wake up one - condition.signal(); - - return true; - - } finally { - s.close(); - } - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * No blocking - */ - public E peek() { - try { - Connection c = ds.getConnection(); - try { - PreparedStatement s = c.prepareStatement(peekQuery); - try { - s.setString(1, queueName); - if (s.execute()) { - ResultSet rs = s.getResultSet(); - if (rs != null && rs.next()) { - return rs.getObject(1, type); - } - } - - return null; - } finally { - s.close(); - } - - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * No blocking - */ - public E poll() { - try { - Connection c = ds.getConnection(); - try { - c.setAutoCommit(false); - - CallableStatement s1 = c.prepareCall(pollQuery[0]); - s1.execute(); - - PreparedStatement s2 = c.prepareStatement(pollQuery[1]); - s2.setString(1, me); // Acquired by me - s2.setString(2, queueName); - s2.execute(); - - CallableStatement s3 = c.prepareCall(pollQuery[2]); - s3.execute(); - - c.commit(); - - if (s3.execute()) { - ResultSet rs = s3.getResultSet(); - if (rs != null && rs.next()) { - return rs.getObject(1, type); - } - } - - return null; - - } finally { - c.setAutoCommit(true); - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public int size() { - try { - Connection c = ds.getConnection(); - try { - PreparedStatement s = c.prepareStatement(sizeQuery); - s.setString(1, queueName); - s.execute(); - - ResultSet rs = s.getResultSet(); - if (rs != null && rs.next()) - return rs.getInt(1); - - throw new RuntimeException("Failed to retreive size"); - - } finally { - c.close(); - } - - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * Blocks until something is in the queue, up to timeout - * null if timeout occurs - */ - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - - final long deadlineMillis = System.currentTimeMillis() + unit.toMillis(timeout); - final Date deadline = new Date(deadlineMillis); - - E head = null; - boolean stillWaiting = true; - - while (stillWaiting) { - // Check if we can grab one - head = poll(); - if (head != null) - break; - - // Block until we are woken, or deadline - // Because we don't have a distributed lock around this condition, there is a race condition - // whereby we might miss a notify(). However, we can somewhat mitigate the problem, by using - // this in a polling fashion - stillWaiting = condition.awaitUntil(deadline); - } - - return head; - } - - public void cleanup() throws SQLException { - Connection c = ds.getConnection(); - try { - CallableStatement s = c.prepareCall(cleanupQuery); - s.setString(1, queueName); - s.execute(); - - } finally { - c.close(); - } - } - - /** - * Cleans up all queues - * - * @throws SQLException - */ - public void cleanupAll() throws SQLException { - Connection c = ds.getConnection(); - try { - CallableStatement s = c.prepareCall(cleanupAllQuery); - s.execute(); - - } finally { - c.close(); - } - } + final static Logger LOG = LoggerFactory.getLogger(MySQLBasedQueue.class); + + final static String addQuery = "INSERT INTO queue (queue_name, inserted, inserted_by, value) values (?, now(), ?, ?)"; + final static String peekQuery = "SELECT value FROM queue WHERE acquired IS NULL AND queue_name = ? ORDER BY id ASC LIMIT 1"; + final static String sizeQuery = "SELECT COUNT(*) FROM queue WHERE acquired IS NULL AND queue_name = ?"; + + /** + * Claims one row (and keeps it in the database) + */ + final static String pollQuery[] = { + "SET @update_id := -1; ", + "UPDATE queue SET " + + " id = (SELECT @update_id := id), " + + " acquired = NOW(), " + + " acquired_by = ? " + + "WHERE acquired IS NULL AND queue_name = ? " + + "ORDER BY id ASC " + + "LIMIT 1; ", + "SELECT value FROM queue WHERE id = @update_id" + }; + + final static String cleanupQuery + = "DELETE FROM queue " + + "WHERE acquired IS NOT NULL " + + " AND queue_name = ? " + + " AND acquired < DATE_SUB(NOW(), INTERVAL 10 DAY)"; + + final static String cleanupAllQuery + = "DELETE FROM queue " + + "WHERE acquired IS NOT NULL " + + " AND acquired < DATE_SUB(NOW(), INTERVAL 10 DAY)"; + + final String me; + + final DataSource ds; + final String queueName; + final Class type; + + final Condition condition; + + /** + * Creates a new MySQL backed queue + * + * @param ds + * @param queueName + * @param type + * @param me The name of this node, for storing in the database table + */ + public MySQLBasedQueue(DataSource ds, String queueName, Class type, String me) { + this.ds = ds; + this.queueName = queueName; + this.type = type; + this.condition = new MySQLSleepBasedCondition(ds, "queue-" + queueName); + this.me = me; + } + + public boolean add(E value) { + try { + Connection c = ds.getConnection(); + try { + PreparedStatement s = c.prepareStatement(addQuery); + try { + s.setString(1, queueName); + s.setObject(2, me); // Inserted by me + s.setObject(3, value); + s.execute(); + + // Wake up one + condition.signal(); + + return true; + + } finally { + s.close(); + } + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * No blocking + */ + public E peek() { + try { + Connection c = ds.getConnection(); + try { + PreparedStatement s = c.prepareStatement(peekQuery); + try { + s.setString(1, queueName); + if (s.execute()) { + ResultSet rs = s.getResultSet(); + if (rs != null && rs.next()) { + return rs.getObject(1, type); + } + } + + return null; + } finally { + s.close(); + } + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * No blocking + */ + public E poll() { + try { + Connection c = ds.getConnection(); + try { + c.setAutoCommit(false); + + CallableStatement s1 = c.prepareCall(pollQuery[0]); + s1.execute(); + + PreparedStatement s2 = c.prepareStatement(pollQuery[1]); + s2.setString(1, me); // Acquired by me + s2.setString(2, queueName); + s2.execute(); + + CallableStatement s3 = c.prepareCall(pollQuery[2]); + s3.execute(); + + c.commit(); + + if (s3.execute()) { + ResultSet rs = s3.getResultSet(); + if (rs != null && rs.next()) { + return rs.getObject(1, type); + } + } + + return null; + + } finally { + c.setAutoCommit(true); + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int size() { + try { + Connection c = ds.getConnection(); + try { + PreparedStatement s = c.prepareStatement(sizeQuery); + s.setString(1, queueName); + s.execute(); + + ResultSet rs = s.getResultSet(); + if (rs != null && rs.next()) { + return rs.getInt(1); + } + + throw new RuntimeException("Failed to retreive size"); + + } finally { + c.close(); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Blocks until something is in the queue, up to timeout null if timeout + * occurs + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + + final long deadlineMillis = System.currentTimeMillis() + unit.toMillis(timeout); + final Date deadline = new Date(deadlineMillis); + + E head = null; + boolean stillWaiting = true; + + while (stillWaiting) { + // Check if we can grab one + head = poll(); + if (head != null) { + break; + } + + // Block until we are woken, or deadline + // Because we don't have a distributed lock around this condition, there is a race condition + // whereby we might miss a notify(). However, we can somewhat mitigate the problem, by using + // this in a polling fashion + stillWaiting = condition.awaitUntil(deadline); + } + + return head; + } + + public void cleanup() throws SQLException { + Connection c = ds.getConnection(); + try { + CallableStatement s = c.prepareCall(cleanupQuery); + s.setString(1, queueName); + s.execute(); + + } finally { + c.close(); + } + } + + /** + * Cleans up all queues + * + * @throws SQLException + */ + public void cleanupAll() throws SQLException { + Connection c = ds.getConnection(); + try { + CallableStatement s = c.prepareCall(cleanupAllQuery); + s.execute(); + + } finally { + c.close(); + } + } } diff --git a/src/main/java/net/bramp/sql/ResultSetFilter.java b/src/main/java/net/bramp/sql/ResultSetFilter.java index 48ec247..977b8de 100644 --- a/src/main/java/net/bramp/sql/ResultSetFilter.java +++ b/src/main/java/net/bramp/sql/ResultSetFilter.java @@ -25,852 +25,856 @@ import javax.annotation.Nonnull; /** - * Wraps an existing ResultSet, and filters out rows that don't match the predicate - * + * Wraps an existing ResultSet, and filters out rows that don't match the + * predicate + * * @author bramp * */ public class ResultSetFilter implements ResultSet { - final ResultSet rs; - final Predicate predicate; - - public interface Predicate { - /** - * Return true if this row is acceptable - * Return false if it is not - * Do not call next() - * @param rs - * @return - * @throws SQLException - */ - public boolean apply(@Nonnull ResultSet rs) throws SQLException; - } - - public ResultSetFilter(@Nonnull ResultSet rs, @Nonnull Predicate predicate) { - this.rs = rs; - this.predicate = predicate; - } - - public T unwrap(Class iface) throws SQLException { - return rs.unwrap(iface); - } - - public boolean isWrapperFor(Class iface) throws SQLException { - return rs.isWrapperFor(iface); - } - - public boolean next() throws SQLException { - while(rs.next()) { - if (predicate.apply(rs)) - return true; - } - return false; - } - - public void close() throws SQLException { - rs.close(); - } - - public boolean wasNull() throws SQLException { - return rs.wasNull(); - } - - public String getString(int columnIndex) throws SQLException { - return rs.getString(columnIndex); - } - - public boolean getBoolean(int columnIndex) throws SQLException { - return rs.getBoolean(columnIndex); - } - - public byte getByte(int columnIndex) throws SQLException { - return rs.getByte(columnIndex); - } - - public short getShort(int columnIndex) throws SQLException { - return rs.getShort(columnIndex); - } - - public int getInt(int columnIndex) throws SQLException { - return rs.getInt(columnIndex); - } - - public long getLong(int columnIndex) throws SQLException { - return rs.getLong(columnIndex); - } - - public float getFloat(int columnIndex) throws SQLException { - return rs.getFloat(columnIndex); - } - - public double getDouble(int columnIndex) throws SQLException { - return rs.getDouble(columnIndex); - } - - @Deprecated - public BigDecimal getBigDecimal(int columnIndex, int scale) - throws SQLException { - return rs.getBigDecimal(columnIndex, scale); - } - - public byte[] getBytes(int columnIndex) throws SQLException { - return rs.getBytes(columnIndex); - } - - public Date getDate(int columnIndex) throws SQLException { - return rs.getDate(columnIndex); - } - - public Time getTime(int columnIndex) throws SQLException { - return rs.getTime(columnIndex); - } - - public Timestamp getTimestamp(int columnIndex) throws SQLException { - return rs.getTimestamp(columnIndex); - } - - public InputStream getAsciiStream(int columnIndex) throws SQLException { - return rs.getAsciiStream(columnIndex); - } - - @Deprecated - public InputStream getUnicodeStream(int columnIndex) throws SQLException { - return rs.getUnicodeStream(columnIndex); - } - - public InputStream getBinaryStream(int columnIndex) throws SQLException { - return rs.getBinaryStream(columnIndex); - } - - public String getString(String columnLabel) throws SQLException { - return rs.getString(columnLabel); - } - - public boolean getBoolean(String columnLabel) throws SQLException { - return rs.getBoolean(columnLabel); - } - - public byte getByte(String columnLabel) throws SQLException { - return rs.getByte(columnLabel); - } - - public short getShort(String columnLabel) throws SQLException { - return rs.getShort(columnLabel); - } - - public int getInt(String columnLabel) throws SQLException { - return rs.getInt(columnLabel); - } - - public long getLong(String columnLabel) throws SQLException { - return rs.getLong(columnLabel); - } - - public float getFloat(String columnLabel) throws SQLException { - return rs.getFloat(columnLabel); - } - - public double getDouble(String columnLabel) throws SQLException { - return rs.getDouble(columnLabel); - } - - @Deprecated - public BigDecimal getBigDecimal(String columnLabel, int scale) - throws SQLException { - return rs.getBigDecimal(columnLabel, scale); - } - - public byte[] getBytes(String columnLabel) throws SQLException { - return rs.getBytes(columnLabel); - } - - public Date getDate(String columnLabel) throws SQLException { - return rs.getDate(columnLabel); - } - - public Time getTime(String columnLabel) throws SQLException { - return rs.getTime(columnLabel); - } - - public Timestamp getTimestamp(String columnLabel) throws SQLException { - return rs.getTimestamp(columnLabel); - } - - public InputStream getAsciiStream(String columnLabel) throws SQLException { - return rs.getAsciiStream(columnLabel); - } - - @Deprecated - public InputStream getUnicodeStream(String columnLabel) throws SQLException { - return rs.getUnicodeStream(columnLabel); - } - - public InputStream getBinaryStream(String columnLabel) throws SQLException { - return rs.getBinaryStream(columnLabel); - } - - public SQLWarning getWarnings() throws SQLException { - return rs.getWarnings(); - } - public void clearWarnings() throws SQLException { - rs.clearWarnings(); - } + final ResultSet rs; + final Predicate predicate; + + public interface Predicate { + + /** + * Return true if this row is acceptable Return false if it is not Do + * not call next() + * + * @param rs + * @return + * @throws SQLException + */ + public boolean apply(@Nonnull ResultSet rs) throws SQLException; + } + + public ResultSetFilter(@Nonnull ResultSet rs, @Nonnull Predicate predicate) { + this.rs = rs; + this.predicate = predicate; + } + + public T unwrap(Class iface) throws SQLException { + return rs.unwrap(iface); + } + + public boolean isWrapperFor(Class iface) throws SQLException { + return rs.isWrapperFor(iface); + } + + public boolean next() throws SQLException { + while (rs.next()) { + if (predicate.apply(rs)) { + return true; + } + } + return false; + } + + public void close() throws SQLException { + rs.close(); + } + + public boolean wasNull() throws SQLException { + return rs.wasNull(); + } + + public String getString(int columnIndex) throws SQLException { + return rs.getString(columnIndex); + } + + public boolean getBoolean(int columnIndex) throws SQLException { + return rs.getBoolean(columnIndex); + } + + public byte getByte(int columnIndex) throws SQLException { + return rs.getByte(columnIndex); + } + + public short getShort(int columnIndex) throws SQLException { + return rs.getShort(columnIndex); + } + + public int getInt(int columnIndex) throws SQLException { + return rs.getInt(columnIndex); + } + + public long getLong(int columnIndex) throws SQLException { + return rs.getLong(columnIndex); + } + + public float getFloat(int columnIndex) throws SQLException { + return rs.getFloat(columnIndex); + } + + public double getDouble(int columnIndex) throws SQLException { + return rs.getDouble(columnIndex); + } + + @Deprecated + public BigDecimal getBigDecimal(int columnIndex, int scale) + throws SQLException { + return rs.getBigDecimal(columnIndex, scale); + } + + public byte[] getBytes(int columnIndex) throws SQLException { + return rs.getBytes(columnIndex); + } + + public Date getDate(int columnIndex) throws SQLException { + return rs.getDate(columnIndex); + } + + public Time getTime(int columnIndex) throws SQLException { + return rs.getTime(columnIndex); + } + + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return rs.getTimestamp(columnIndex); + } + + public InputStream getAsciiStream(int columnIndex) throws SQLException { + return rs.getAsciiStream(columnIndex); + } + + @Deprecated + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + return rs.getUnicodeStream(columnIndex); + } + + public InputStream getBinaryStream(int columnIndex) throws SQLException { + return rs.getBinaryStream(columnIndex); + } + + public String getString(String columnLabel) throws SQLException { + return rs.getString(columnLabel); + } + + public boolean getBoolean(String columnLabel) throws SQLException { + return rs.getBoolean(columnLabel); + } + + public byte getByte(String columnLabel) throws SQLException { + return rs.getByte(columnLabel); + } + + public short getShort(String columnLabel) throws SQLException { + return rs.getShort(columnLabel); + } + + public int getInt(String columnLabel) throws SQLException { + return rs.getInt(columnLabel); + } + + public long getLong(String columnLabel) throws SQLException { + return rs.getLong(columnLabel); + } + + public float getFloat(String columnLabel) throws SQLException { + return rs.getFloat(columnLabel); + } + + public double getDouble(String columnLabel) throws SQLException { + return rs.getDouble(columnLabel); + } + + @Deprecated + public BigDecimal getBigDecimal(String columnLabel, int scale) + throws SQLException { + return rs.getBigDecimal(columnLabel, scale); + } + + public byte[] getBytes(String columnLabel) throws SQLException { + return rs.getBytes(columnLabel); + } + + public Date getDate(String columnLabel) throws SQLException { + return rs.getDate(columnLabel); + } + + public Time getTime(String columnLabel) throws SQLException { + return rs.getTime(columnLabel); + } + + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return rs.getTimestamp(columnLabel); + } + + public InputStream getAsciiStream(String columnLabel) throws SQLException { + return rs.getAsciiStream(columnLabel); + } + + @Deprecated + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + return rs.getUnicodeStream(columnLabel); + } + + public InputStream getBinaryStream(String columnLabel) throws SQLException { + return rs.getBinaryStream(columnLabel); + } + + public SQLWarning getWarnings() throws SQLException { + return rs.getWarnings(); + } + + public void clearWarnings() throws SQLException { + rs.clearWarnings(); + } - public String getCursorName() throws SQLException { - return rs.getCursorName(); - } + public String getCursorName() throws SQLException { + return rs.getCursorName(); + } - public ResultSetMetaData getMetaData() throws SQLException { - return rs.getMetaData(); - } + public ResultSetMetaData getMetaData() throws SQLException { + return rs.getMetaData(); + } - public Object getObject(int columnIndex) throws SQLException { - return rs.getObject(columnIndex); - } + public Object getObject(int columnIndex) throws SQLException { + return rs.getObject(columnIndex); + } - public Object getObject(String columnLabel) throws SQLException { - return rs.getObject(columnLabel); - } + public Object getObject(String columnLabel) throws SQLException { + return rs.getObject(columnLabel); + } - public int findColumn(String columnLabel) throws SQLException { - return rs.findColumn(columnLabel); - } + public int findColumn(String columnLabel) throws SQLException { + return rs.findColumn(columnLabel); + } - public Reader getCharacterStream(int columnIndex) throws SQLException { - return rs.getCharacterStream(columnIndex); - } + public Reader getCharacterStream(int columnIndex) throws SQLException { + return rs.getCharacterStream(columnIndex); + } - public Reader getCharacterStream(String columnLabel) throws SQLException { - return rs.getCharacterStream(columnLabel); - } + public Reader getCharacterStream(String columnLabel) throws SQLException { + return rs.getCharacterStream(columnLabel); + } - public BigDecimal getBigDecimal(int columnIndex) throws SQLException { - return rs.getBigDecimal(columnIndex); - } + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + return rs.getBigDecimal(columnIndex); + } - public BigDecimal getBigDecimal(String columnLabel) throws SQLException { - return rs.getBigDecimal(columnLabel); - } + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return rs.getBigDecimal(columnLabel); + } - public boolean isBeforeFirst() throws SQLException { - return rs.isBeforeFirst(); - } + public boolean isBeforeFirst() throws SQLException { + return rs.isBeforeFirst(); + } - public boolean isAfterLast() throws SQLException { - return rs.isAfterLast(); - } + public boolean isAfterLast() throws SQLException { + return rs.isAfterLast(); + } - public boolean isFirst() throws SQLException { - return rs.isFirst(); - } + public boolean isFirst() throws SQLException { + return rs.isFirst(); + } - public boolean isLast() throws SQLException { - return rs.isLast(); - } + public boolean isLast() throws SQLException { + return rs.isLast(); + } - public void beforeFirst() throws SQLException { - rs.beforeFirst(); - } - - public void afterLast() throws SQLException { - rs.afterLast(); - } - - public boolean first() throws SQLException { - return rs.first(); - } - - public boolean last() throws SQLException { - return rs.last(); - } - - public int getRow() throws SQLException { - return rs.getRow(); - } - - public boolean absolute(int row) throws SQLException { - return rs.absolute(row); - } - - public boolean relative(int rows) throws SQLException { - return rs.relative(rows); - } - - public boolean previous() throws SQLException { - return rs.previous(); - } - - public void setFetchDirection(int direction) throws SQLException { - rs.setFetchDirection(direction); - } + public void beforeFirst() throws SQLException { + rs.beforeFirst(); + } + + public void afterLast() throws SQLException { + rs.afterLast(); + } + + public boolean first() throws SQLException { + return rs.first(); + } + + public boolean last() throws SQLException { + return rs.last(); + } + + public int getRow() throws SQLException { + return rs.getRow(); + } + + public boolean absolute(int row) throws SQLException { + return rs.absolute(row); + } + + public boolean relative(int rows) throws SQLException { + return rs.relative(rows); + } + + public boolean previous() throws SQLException { + return rs.previous(); + } + + public void setFetchDirection(int direction) throws SQLException { + rs.setFetchDirection(direction); + } - public int getFetchDirection() throws SQLException { - return rs.getFetchDirection(); - } - - public void setFetchSize(int rows) throws SQLException { - rs.setFetchSize(rows); - } - - public int getFetchSize() throws SQLException { - return rs.getFetchSize(); - } - - public int getType() throws SQLException { - return rs.getType(); - } - - public int getConcurrency() throws SQLException { - return rs.getConcurrency(); - } - - public boolean rowUpdated() throws SQLException { - return rs.rowUpdated(); - } - - public boolean rowInserted() throws SQLException { - return rs.rowInserted(); - } - - public boolean rowDeleted() throws SQLException { - return rs.rowDeleted(); - } - - public void updateNull(int columnIndex) throws SQLException { - rs.updateNull(columnIndex); - } - - public void updateBoolean(int columnIndex, boolean x) throws SQLException { - rs.updateBoolean(columnIndex, x); - } - - public void updateByte(int columnIndex, byte x) throws SQLException { - rs.updateByte(columnIndex, x); - } - - public void updateShort(int columnIndex, short x) throws SQLException { - rs.updateShort(columnIndex, x); - } - - public void updateInt(int columnIndex, int x) throws SQLException { - rs.updateInt(columnIndex, x); - } - - public void updateLong(int columnIndex, long x) throws SQLException { - rs.updateLong(columnIndex, x); - } - - public void updateFloat(int columnIndex, float x) throws SQLException { - rs.updateFloat(columnIndex, x); - } - - public void updateDouble(int columnIndex, double x) throws SQLException { - rs.updateDouble(columnIndex, x); - } - - public void updateBigDecimal(int columnIndex, BigDecimal x) - throws SQLException { - rs.updateBigDecimal(columnIndex, x); - } - - public void updateString(int columnIndex, String x) throws SQLException { - rs.updateString(columnIndex, x); - } - - public void updateBytes(int columnIndex, byte[] x) throws SQLException { - rs.updateBytes(columnIndex, x); - } - - public void updateDate(int columnIndex, Date x) throws SQLException { - rs.updateDate(columnIndex, x); - } - - public void updateTime(int columnIndex, Time x) throws SQLException { - rs.updateTime(columnIndex, x); - } - - public void updateTimestamp(int columnIndex, Timestamp x) - throws SQLException { - rs.updateTimestamp(columnIndex, x); - } - - public void updateAsciiStream(int columnIndex, InputStream x, int length) - throws SQLException { - rs.updateAsciiStream(columnIndex, x, length); - } - - public void updateBinaryStream(int columnIndex, InputStream x, int length) - throws SQLException { - rs.updateBinaryStream(columnIndex, x, length); - } - - public void updateCharacterStream(int columnIndex, Reader x, int length) - throws SQLException { - rs.updateCharacterStream(columnIndex, x, length); - } - - public void updateObject(int columnIndex, Object x, int scaleOrLength) - throws SQLException { - rs.updateObject(columnIndex, x, scaleOrLength); - } - - public void updateObject(int columnIndex, Object x) throws SQLException { - rs.updateObject(columnIndex, x); - } - - public void updateNull(String columnLabel) throws SQLException { - rs.updateNull(columnLabel); - } - - public void updateBoolean(String columnLabel, boolean x) - throws SQLException { - rs.updateBoolean(columnLabel, x); - } - - public void updateByte(String columnLabel, byte x) throws SQLException { - rs.updateByte(columnLabel, x); - } - - public void updateShort(String columnLabel, short x) throws SQLException { - rs.updateShort(columnLabel, x); - } - - public void updateInt(String columnLabel, int x) throws SQLException { - rs.updateInt(columnLabel, x); - } - - public void updateLong(String columnLabel, long x) throws SQLException { - rs.updateLong(columnLabel, x); - } - - public void updateFloat(String columnLabel, float x) throws SQLException { - rs.updateFloat(columnLabel, x); - } - - public void updateDouble(String columnLabel, double x) throws SQLException { - rs.updateDouble(columnLabel, x); - } - - public void updateBigDecimal(String columnLabel, BigDecimal x) - throws SQLException { - rs.updateBigDecimal(columnLabel, x); - } - - public void updateString(String columnLabel, String x) throws SQLException { - rs.updateString(columnLabel, x); - } - - public void updateBytes(String columnLabel, byte[] x) throws SQLException { - rs.updateBytes(columnLabel, x); - } - - public void updateDate(String columnLabel, Date x) throws SQLException { - rs.updateDate(columnLabel, x); - } - - public void updateTime(String columnLabel, Time x) throws SQLException { - rs.updateTime(columnLabel, x); - } - - public void updateTimestamp(String columnLabel, Timestamp x) - throws SQLException { - rs.updateTimestamp(columnLabel, x); - } - - public void updateAsciiStream(String columnLabel, InputStream x, int length) - throws SQLException { - rs.updateAsciiStream(columnLabel, x, length); - } - - public void updateBinaryStream(String columnLabel, InputStream x, int length) - throws SQLException { - rs.updateBinaryStream(columnLabel, x, length); - } - - public void updateCharacterStream(String columnLabel, Reader reader, - int length) throws SQLException { - rs.updateCharacterStream(columnLabel, reader, length); - } - - public void updateObject(String columnLabel, Object x, int scaleOrLength) - throws SQLException { - rs.updateObject(columnLabel, x, scaleOrLength); - } - - public void updateObject(String columnLabel, Object x) throws SQLException { - rs.updateObject(columnLabel, x); - } - - public void insertRow() throws SQLException { - rs.insertRow(); - } - - public void updateRow() throws SQLException { - rs.updateRow(); - } - - public void deleteRow() throws SQLException { - rs.deleteRow(); - } - - public void refreshRow() throws SQLException { - rs.refreshRow(); - } - - public void cancelRowUpdates() throws SQLException { - rs.cancelRowUpdates(); - } - - public void moveToInsertRow() throws SQLException { - rs.moveToInsertRow(); - } - - public void moveToCurrentRow() throws SQLException { - rs.moveToCurrentRow(); - } - - public Statement getStatement() throws SQLException { - return rs.getStatement(); - } - - public Object getObject(int columnIndex, Map> map) - throws SQLException { - return rs.getObject(columnIndex, map); - } - - public Ref getRef(int columnIndex) throws SQLException { - return rs.getRef(columnIndex); - } - - public Blob getBlob(int columnIndex) throws SQLException { - return rs.getBlob(columnIndex); - } - - public Clob getClob(int columnIndex) throws SQLException { - return rs.getClob(columnIndex); - } - - public Array getArray(int columnIndex) throws SQLException { - return rs.getArray(columnIndex); - } - - public Object getObject(String columnLabel, Map> map) - throws SQLException { - return rs.getObject(columnLabel, map); - } - - public Ref getRef(String columnLabel) throws SQLException { - return rs.getRef(columnLabel); - } - - public Blob getBlob(String columnLabel) throws SQLException { - return rs.getBlob(columnLabel); - } - - public Clob getClob(String columnLabel) throws SQLException { - return rs.getClob(columnLabel); - } - - public Array getArray(String columnLabel) throws SQLException { - return rs.getArray(columnLabel); - } - - public Date getDate(int columnIndex, Calendar cal) throws SQLException { - return rs.getDate(columnIndex, cal); - } - - public Date getDate(String columnLabel, Calendar cal) throws SQLException { - return rs.getDate(columnLabel, cal); - } - - public Time getTime(int columnIndex, Calendar cal) throws SQLException { - return rs.getTime(columnIndex, cal); - } - - public Time getTime(String columnLabel, Calendar cal) throws SQLException { - return rs.getTime(columnLabel, cal); - } - - public Timestamp getTimestamp(int columnIndex, Calendar cal) - throws SQLException { - return rs.getTimestamp(columnIndex, cal); - } - - public Timestamp getTimestamp(String columnLabel, Calendar cal) - throws SQLException { - return rs.getTimestamp(columnLabel, cal); - } - - public URL getURL(int columnIndex) throws SQLException { - return rs.getURL(columnIndex); - } - - public URL getURL(String columnLabel) throws SQLException { - return rs.getURL(columnLabel); - } - - public void updateRef(int columnIndex, Ref x) throws SQLException { - rs.updateRef(columnIndex, x); - } - - public void updateRef(String columnLabel, Ref x) throws SQLException { - rs.updateRef(columnLabel, x); - } - - public void updateBlob(int columnIndex, Blob x) throws SQLException { - rs.updateBlob(columnIndex, x); - } - - public void updateBlob(String columnLabel, Blob x) throws SQLException { - rs.updateBlob(columnLabel, x); - } - - public void updateClob(int columnIndex, Clob x) throws SQLException { - rs.updateClob(columnIndex, x); - } - - public void updateClob(String columnLabel, Clob x) throws SQLException { - rs.updateClob(columnLabel, x); - } - - public void updateArray(int columnIndex, Array x) throws SQLException { - rs.updateArray(columnIndex, x); - } - - public void updateArray(String columnLabel, Array x) throws SQLException { - rs.updateArray(columnLabel, x); - } - - public RowId getRowId(int columnIndex) throws SQLException { - return rs.getRowId(columnIndex); - } - - public RowId getRowId(String columnLabel) throws SQLException { - return rs.getRowId(columnLabel); - } - - public void updateRowId(int columnIndex, RowId x) throws SQLException { - rs.updateRowId(columnIndex, x); - } - - public void updateRowId(String columnLabel, RowId x) throws SQLException { - rs.updateRowId(columnLabel, x); - } - - public int getHoldability() throws SQLException { - return rs.getHoldability(); - } - - public boolean isClosed() throws SQLException { - return rs.isClosed(); - } - - public void updateNString(int columnIndex, String nString) - throws SQLException { - rs.updateNString(columnIndex, nString); - } - - public void updateNString(String columnLabel, String nString) - throws SQLException { - rs.updateNString(columnLabel, nString); - } - - public void updateNClob(int columnIndex, NClob nClob) throws SQLException { - rs.updateNClob(columnIndex, nClob); - } - - public void updateNClob(String columnLabel, NClob nClob) - throws SQLException { - rs.updateNClob(columnLabel, nClob); - } - - public NClob getNClob(int columnIndex) throws SQLException { - return rs.getNClob(columnIndex); - } - - public NClob getNClob(String columnLabel) throws SQLException { - return rs.getNClob(columnLabel); - } - - public SQLXML getSQLXML(int columnIndex) throws SQLException { - return rs.getSQLXML(columnIndex); - } - - public SQLXML getSQLXML(String columnLabel) throws SQLException { - return rs.getSQLXML(columnLabel); - } - - public void updateSQLXML(int columnIndex, SQLXML xmlObject) - throws SQLException { - rs.updateSQLXML(columnIndex, xmlObject); - } - - public void updateSQLXML(String columnLabel, SQLXML xmlObject) - throws SQLException { - rs.updateSQLXML(columnLabel, xmlObject); - } - - public String getNString(int columnIndex) throws SQLException { - return rs.getNString(columnIndex); - } - - public String getNString(String columnLabel) throws SQLException { - return rs.getNString(columnLabel); - } - - public Reader getNCharacterStream(int columnIndex) throws SQLException { - return rs.getNCharacterStream(columnIndex); - } - - public Reader getNCharacterStream(String columnLabel) throws SQLException { - return rs.getNCharacterStream(columnLabel); - } - - public void updateNCharacterStream(int columnIndex, Reader x, long length) - throws SQLException { - rs.updateNCharacterStream(columnIndex, x, length); - } - - public void updateNCharacterStream(String columnLabel, Reader reader, - long length) throws SQLException { - rs.updateNCharacterStream(columnLabel, reader, length); - } - - public void updateAsciiStream(int columnIndex, InputStream x, long length) - throws SQLException { - rs.updateAsciiStream(columnIndex, x, length); - } - - public void updateBinaryStream(int columnIndex, InputStream x, long length) - throws SQLException { - rs.updateBinaryStream(columnIndex, x, length); - } - - public void updateCharacterStream(int columnIndex, Reader x, long length) - throws SQLException { - rs.updateCharacterStream(columnIndex, x, length); - } - - public void updateAsciiStream(String columnLabel, InputStream x, long length) - throws SQLException { - rs.updateAsciiStream(columnLabel, x, length); - } - - public void updateBinaryStream(String columnLabel, InputStream x, - long length) throws SQLException { - rs.updateBinaryStream(columnLabel, x, length); - } - - public void updateCharacterStream(String columnLabel, Reader reader, - long length) throws SQLException { - rs.updateCharacterStream(columnLabel, reader, length); - } - - public void updateBlob(int columnIndex, InputStream inputStream, long length) - throws SQLException { - rs.updateBlob(columnIndex, inputStream, length); - } - - public void updateBlob(String columnLabel, InputStream inputStream, - long length) throws SQLException { - rs.updateBlob(columnLabel, inputStream, length); - } - - public void updateClob(int columnIndex, Reader reader, long length) - throws SQLException { - rs.updateClob(columnIndex, reader, length); - } - - public void updateClob(String columnLabel, Reader reader, long length) - throws SQLException { - rs.updateClob(columnLabel, reader, length); - } - - public void updateNClob(int columnIndex, Reader reader, long length) - throws SQLException { - rs.updateNClob(columnIndex, reader, length); - } - - public void updateNClob(String columnLabel, Reader reader, long length) - throws SQLException { - rs.updateNClob(columnLabel, reader, length); - } - - public void updateNCharacterStream(int columnIndex, Reader x) - throws SQLException { - rs.updateNCharacterStream(columnIndex, x); - } - - public void updateNCharacterStream(String columnLabel, Reader reader) - throws SQLException { - rs.updateNCharacterStream(columnLabel, reader); - } - - public void updateAsciiStream(int columnIndex, InputStream x) - throws SQLException { - rs.updateAsciiStream(columnIndex, x); - } - - public void updateBinaryStream(int columnIndex, InputStream x) - throws SQLException { - rs.updateBinaryStream(columnIndex, x); - } - - public void updateCharacterStream(int columnIndex, Reader x) - throws SQLException { - rs.updateCharacterStream(columnIndex, x); - } - - public void updateAsciiStream(String columnLabel, InputStream x) - throws SQLException { - rs.updateAsciiStream(columnLabel, x); - } - - public void updateBinaryStream(String columnLabel, InputStream x) - throws SQLException { - rs.updateBinaryStream(columnLabel, x); - } - - public void updateCharacterStream(String columnLabel, Reader reader) - throws SQLException { - rs.updateCharacterStream(columnLabel, reader); - } - - public void updateBlob(int columnIndex, InputStream inputStream) - throws SQLException { - rs.updateBlob(columnIndex, inputStream); - } - - public void updateBlob(String columnLabel, InputStream inputStream) - throws SQLException { - rs.updateBlob(columnLabel, inputStream); - } - - public void updateClob(int columnIndex, Reader reader) throws SQLException { - rs.updateClob(columnIndex, reader); - } - - public void updateClob(String columnLabel, Reader reader) - throws SQLException { - rs.updateClob(columnLabel, reader); - } - - public void updateNClob(int columnIndex, Reader reader) throws SQLException { - rs.updateNClob(columnIndex, reader); - } - - public void updateNClob(String columnLabel, Reader reader) - throws SQLException { - rs.updateNClob(columnLabel, reader); - } - - public T getObject(int columnIndex, Class type) throws SQLException { - return rs.getObject(columnIndex, type); - } - - public T getObject(String columnLabel, Class type) - throws SQLException { - return rs.getObject(columnLabel, type); - } + public int getFetchDirection() throws SQLException { + return rs.getFetchDirection(); + } + + public void setFetchSize(int rows) throws SQLException { + rs.setFetchSize(rows); + } + + public int getFetchSize() throws SQLException { + return rs.getFetchSize(); + } + + public int getType() throws SQLException { + return rs.getType(); + } + + public int getConcurrency() throws SQLException { + return rs.getConcurrency(); + } + + public boolean rowUpdated() throws SQLException { + return rs.rowUpdated(); + } + + public boolean rowInserted() throws SQLException { + return rs.rowInserted(); + } + + public boolean rowDeleted() throws SQLException { + return rs.rowDeleted(); + } + + public void updateNull(int columnIndex) throws SQLException { + rs.updateNull(columnIndex); + } + + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + rs.updateBoolean(columnIndex, x); + } + + public void updateByte(int columnIndex, byte x) throws SQLException { + rs.updateByte(columnIndex, x); + } + + public void updateShort(int columnIndex, short x) throws SQLException { + rs.updateShort(columnIndex, x); + } + + public void updateInt(int columnIndex, int x) throws SQLException { + rs.updateInt(columnIndex, x); + } + + public void updateLong(int columnIndex, long x) throws SQLException { + rs.updateLong(columnIndex, x); + } + + public void updateFloat(int columnIndex, float x) throws SQLException { + rs.updateFloat(columnIndex, x); + } + + public void updateDouble(int columnIndex, double x) throws SQLException { + rs.updateDouble(columnIndex, x); + } + + public void updateBigDecimal(int columnIndex, BigDecimal x) + throws SQLException { + rs.updateBigDecimal(columnIndex, x); + } + + public void updateString(int columnIndex, String x) throws SQLException { + rs.updateString(columnIndex, x); + } + + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + rs.updateBytes(columnIndex, x); + } + + public void updateDate(int columnIndex, Date x) throws SQLException { + rs.updateDate(columnIndex, x); + } + + public void updateTime(int columnIndex, Time x) throws SQLException { + rs.updateTime(columnIndex, x); + } + + public void updateTimestamp(int columnIndex, Timestamp x) + throws SQLException { + rs.updateTimestamp(columnIndex, x); + } + + public void updateAsciiStream(int columnIndex, InputStream x, int length) + throws SQLException { + rs.updateAsciiStream(columnIndex, x, length); + } + + public void updateBinaryStream(int columnIndex, InputStream x, int length) + throws SQLException { + rs.updateBinaryStream(columnIndex, x, length); + } + + public void updateCharacterStream(int columnIndex, Reader x, int length) + throws SQLException { + rs.updateCharacterStream(columnIndex, x, length); + } + + public void updateObject(int columnIndex, Object x, int scaleOrLength) + throws SQLException { + rs.updateObject(columnIndex, x, scaleOrLength); + } + + public void updateObject(int columnIndex, Object x) throws SQLException { + rs.updateObject(columnIndex, x); + } + + public void updateNull(String columnLabel) throws SQLException { + rs.updateNull(columnLabel); + } + + public void updateBoolean(String columnLabel, boolean x) + throws SQLException { + rs.updateBoolean(columnLabel, x); + } + + public void updateByte(String columnLabel, byte x) throws SQLException { + rs.updateByte(columnLabel, x); + } + + public void updateShort(String columnLabel, short x) throws SQLException { + rs.updateShort(columnLabel, x); + } + + public void updateInt(String columnLabel, int x) throws SQLException { + rs.updateInt(columnLabel, x); + } + + public void updateLong(String columnLabel, long x) throws SQLException { + rs.updateLong(columnLabel, x); + } + + public void updateFloat(String columnLabel, float x) throws SQLException { + rs.updateFloat(columnLabel, x); + } + + public void updateDouble(String columnLabel, double x) throws SQLException { + rs.updateDouble(columnLabel, x); + } + + public void updateBigDecimal(String columnLabel, BigDecimal x) + throws SQLException { + rs.updateBigDecimal(columnLabel, x); + } + + public void updateString(String columnLabel, String x) throws SQLException { + rs.updateString(columnLabel, x); + } + + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + rs.updateBytes(columnLabel, x); + } + + public void updateDate(String columnLabel, Date x) throws SQLException { + rs.updateDate(columnLabel, x); + } + + public void updateTime(String columnLabel, Time x) throws SQLException { + rs.updateTime(columnLabel, x); + } + + public void updateTimestamp(String columnLabel, Timestamp x) + throws SQLException { + rs.updateTimestamp(columnLabel, x); + } + + public void updateAsciiStream(String columnLabel, InputStream x, int length) + throws SQLException { + rs.updateAsciiStream(columnLabel, x, length); + } + + public void updateBinaryStream(String columnLabel, InputStream x, int length) + throws SQLException { + rs.updateBinaryStream(columnLabel, x, length); + } + + public void updateCharacterStream(String columnLabel, Reader reader, + int length) throws SQLException { + rs.updateCharacterStream(columnLabel, reader, length); + } + + public void updateObject(String columnLabel, Object x, int scaleOrLength) + throws SQLException { + rs.updateObject(columnLabel, x, scaleOrLength); + } + + public void updateObject(String columnLabel, Object x) throws SQLException { + rs.updateObject(columnLabel, x); + } + + public void insertRow() throws SQLException { + rs.insertRow(); + } + + public void updateRow() throws SQLException { + rs.updateRow(); + } + + public void deleteRow() throws SQLException { + rs.deleteRow(); + } + + public void refreshRow() throws SQLException { + rs.refreshRow(); + } + + public void cancelRowUpdates() throws SQLException { + rs.cancelRowUpdates(); + } + + public void moveToInsertRow() throws SQLException { + rs.moveToInsertRow(); + } + + public void moveToCurrentRow() throws SQLException { + rs.moveToCurrentRow(); + } + + public Statement getStatement() throws SQLException { + return rs.getStatement(); + } + + public Object getObject(int columnIndex, Map> map) + throws SQLException { + return rs.getObject(columnIndex, map); + } + + public Ref getRef(int columnIndex) throws SQLException { + return rs.getRef(columnIndex); + } + + public Blob getBlob(int columnIndex) throws SQLException { + return rs.getBlob(columnIndex); + } + + public Clob getClob(int columnIndex) throws SQLException { + return rs.getClob(columnIndex); + } + + public Array getArray(int columnIndex) throws SQLException { + return rs.getArray(columnIndex); + } + + public Object getObject(String columnLabel, Map> map) + throws SQLException { + return rs.getObject(columnLabel, map); + } + + public Ref getRef(String columnLabel) throws SQLException { + return rs.getRef(columnLabel); + } + + public Blob getBlob(String columnLabel) throws SQLException { + return rs.getBlob(columnLabel); + } + + public Clob getClob(String columnLabel) throws SQLException { + return rs.getClob(columnLabel); + } + + public Array getArray(String columnLabel) throws SQLException { + return rs.getArray(columnLabel); + } + + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return rs.getDate(columnIndex, cal); + } + + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return rs.getDate(columnLabel, cal); + } + + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return rs.getTime(columnIndex, cal); + } + + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return rs.getTime(columnLabel, cal); + } + + public Timestamp getTimestamp(int columnIndex, Calendar cal) + throws SQLException { + return rs.getTimestamp(columnIndex, cal); + } + + public Timestamp getTimestamp(String columnLabel, Calendar cal) + throws SQLException { + return rs.getTimestamp(columnLabel, cal); + } + + public URL getURL(int columnIndex) throws SQLException { + return rs.getURL(columnIndex); + } + + public URL getURL(String columnLabel) throws SQLException { + return rs.getURL(columnLabel); + } + + public void updateRef(int columnIndex, Ref x) throws SQLException { + rs.updateRef(columnIndex, x); + } + + public void updateRef(String columnLabel, Ref x) throws SQLException { + rs.updateRef(columnLabel, x); + } + + public void updateBlob(int columnIndex, Blob x) throws SQLException { + rs.updateBlob(columnIndex, x); + } + + public void updateBlob(String columnLabel, Blob x) throws SQLException { + rs.updateBlob(columnLabel, x); + } + + public void updateClob(int columnIndex, Clob x) throws SQLException { + rs.updateClob(columnIndex, x); + } + + public void updateClob(String columnLabel, Clob x) throws SQLException { + rs.updateClob(columnLabel, x); + } + + public void updateArray(int columnIndex, Array x) throws SQLException { + rs.updateArray(columnIndex, x); + } + + public void updateArray(String columnLabel, Array x) throws SQLException { + rs.updateArray(columnLabel, x); + } + + public RowId getRowId(int columnIndex) throws SQLException { + return rs.getRowId(columnIndex); + } + + public RowId getRowId(String columnLabel) throws SQLException { + return rs.getRowId(columnLabel); + } + + public void updateRowId(int columnIndex, RowId x) throws SQLException { + rs.updateRowId(columnIndex, x); + } + + public void updateRowId(String columnLabel, RowId x) throws SQLException { + rs.updateRowId(columnLabel, x); + } + + public int getHoldability() throws SQLException { + return rs.getHoldability(); + } + + public boolean isClosed() throws SQLException { + return rs.isClosed(); + } + + public void updateNString(int columnIndex, String nString) + throws SQLException { + rs.updateNString(columnIndex, nString); + } + + public void updateNString(String columnLabel, String nString) + throws SQLException { + rs.updateNString(columnLabel, nString); + } + + public void updateNClob(int columnIndex, NClob nClob) throws SQLException { + rs.updateNClob(columnIndex, nClob); + } + + public void updateNClob(String columnLabel, NClob nClob) + throws SQLException { + rs.updateNClob(columnLabel, nClob); + } + + public NClob getNClob(int columnIndex) throws SQLException { + return rs.getNClob(columnIndex); + } + + public NClob getNClob(String columnLabel) throws SQLException { + return rs.getNClob(columnLabel); + } + + public SQLXML getSQLXML(int columnIndex) throws SQLException { + return rs.getSQLXML(columnIndex); + } + + public SQLXML getSQLXML(String columnLabel) throws SQLException { + return rs.getSQLXML(columnLabel); + } + + public void updateSQLXML(int columnIndex, SQLXML xmlObject) + throws SQLException { + rs.updateSQLXML(columnIndex, xmlObject); + } + + public void updateSQLXML(String columnLabel, SQLXML xmlObject) + throws SQLException { + rs.updateSQLXML(columnLabel, xmlObject); + } + + public String getNString(int columnIndex) throws SQLException { + return rs.getNString(columnIndex); + } + + public String getNString(String columnLabel) throws SQLException { + return rs.getNString(columnLabel); + } + + public Reader getNCharacterStream(int columnIndex) throws SQLException { + return rs.getNCharacterStream(columnIndex); + } + + public Reader getNCharacterStream(String columnLabel) throws SQLException { + return rs.getNCharacterStream(columnLabel); + } + + public void updateNCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + rs.updateNCharacterStream(columnIndex, x, length); + } + + public void updateNCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + rs.updateNCharacterStream(columnLabel, reader, length); + } + + public void updateAsciiStream(int columnIndex, InputStream x, long length) + throws SQLException { + rs.updateAsciiStream(columnIndex, x, length); + } + + public void updateBinaryStream(int columnIndex, InputStream x, long length) + throws SQLException { + rs.updateBinaryStream(columnIndex, x, length); + } + + public void updateCharacterStream(int columnIndex, Reader x, long length) + throws SQLException { + rs.updateCharacterStream(columnIndex, x, length); + } + + public void updateAsciiStream(String columnLabel, InputStream x, long length) + throws SQLException { + rs.updateAsciiStream(columnLabel, x, length); + } + + public void updateBinaryStream(String columnLabel, InputStream x, + long length) throws SQLException { + rs.updateBinaryStream(columnLabel, x, length); + } + + public void updateCharacterStream(String columnLabel, Reader reader, + long length) throws SQLException { + rs.updateCharacterStream(columnLabel, reader, length); + } + + public void updateBlob(int columnIndex, InputStream inputStream, long length) + throws SQLException { + rs.updateBlob(columnIndex, inputStream, length); + } + + public void updateBlob(String columnLabel, InputStream inputStream, + long length) throws SQLException { + rs.updateBlob(columnLabel, inputStream, length); + } + + public void updateClob(int columnIndex, Reader reader, long length) + throws SQLException { + rs.updateClob(columnIndex, reader, length); + } + + public void updateClob(String columnLabel, Reader reader, long length) + throws SQLException { + rs.updateClob(columnLabel, reader, length); + } + + public void updateNClob(int columnIndex, Reader reader, long length) + throws SQLException { + rs.updateNClob(columnIndex, reader, length); + } + + public void updateNClob(String columnLabel, Reader reader, long length) + throws SQLException { + rs.updateNClob(columnLabel, reader, length); + } + + public void updateNCharacterStream(int columnIndex, Reader x) + throws SQLException { + rs.updateNCharacterStream(columnIndex, x); + } + + public void updateNCharacterStream(String columnLabel, Reader reader) + throws SQLException { + rs.updateNCharacterStream(columnLabel, reader); + } + + public void updateAsciiStream(int columnIndex, InputStream x) + throws SQLException { + rs.updateAsciiStream(columnIndex, x); + } + + public void updateBinaryStream(int columnIndex, InputStream x) + throws SQLException { + rs.updateBinaryStream(columnIndex, x); + } + + public void updateCharacterStream(int columnIndex, Reader x) + throws SQLException { + rs.updateCharacterStream(columnIndex, x); + } + + public void updateAsciiStream(String columnLabel, InputStream x) + throws SQLException { + rs.updateAsciiStream(columnLabel, x); + } + + public void updateBinaryStream(String columnLabel, InputStream x) + throws SQLException { + rs.updateBinaryStream(columnLabel, x); + } + + public void updateCharacterStream(String columnLabel, Reader reader) + throws SQLException { + rs.updateCharacterStream(columnLabel, reader); + } + + public void updateBlob(int columnIndex, InputStream inputStream) + throws SQLException { + rs.updateBlob(columnIndex, inputStream); + } + + public void updateBlob(String columnLabel, InputStream inputStream) + throws SQLException { + rs.updateBlob(columnLabel, inputStream); + } + + public void updateClob(int columnIndex, Reader reader) throws SQLException { + rs.updateClob(columnIndex, reader); + } + + public void updateClob(String columnLabel, Reader reader) + throws SQLException { + rs.updateClob(columnLabel, reader); + } + + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + rs.updateNClob(columnIndex, reader); + } + + public void updateNClob(String columnLabel, Reader reader) + throws SQLException { + rs.updateNClob(columnLabel, reader); + } + + public T getObject(int columnIndex, Class type) throws SQLException { + return rs.getObject(columnIndex, type); + } + + public T getObject(String columnLabel, Class type) + throws SQLException { + return rs.getObject(columnLabel, type); + } } diff --git a/src/main/java/net/bramp/sql/ResultSets.java b/src/main/java/net/bramp/sql/ResultSets.java index 33faf2b..6759874 100644 --- a/src/main/java/net/bramp/sql/ResultSets.java +++ b/src/main/java/net/bramp/sql/ResultSets.java @@ -4,18 +4,21 @@ import java.sql.SQLException; public final class ResultSets { - private ResultSets() {} - - public static String toString(ResultSet rs) throws SQLException { - StringBuilder sb = new StringBuilder(); - int cols = rs.getMetaData().getColumnCount(); - for (int i = 1; i <= cols; i++) { - sb.append('"').append( rs.getString(i) ).append('"').append( ", "); - } - if (cols > 0) - sb.setLength( sb.length() - 2); + private ResultSets() { + } - return sb.toString(); - } + public static String toString(ResultSet rs) throws SQLException { + StringBuilder sb = new StringBuilder(); + int cols = rs.getMetaData().getColumnCount(); + for (int i = 1; i <= cols; i++) { + sb.append('"').append(rs.getString(i)).append('"').append(", "); + } + + if (cols > 0) { + sb.setLength(sb.length() - 2); + } + + return sb.toString(); + } }