Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime;
import scala.Option;
Expand Down Expand Up @@ -92,10 +93,10 @@ public SQLConnection execute(String sql, Handler<AsyncResult<Void>> handler) {
beginTransactionIfNeeded(v -> {
final scala.concurrent.Future<QueryResult> future = connection.sendQuery(sql);
future.onComplete(ScalaUtils.<QueryResult>toFunction1(ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
}), executionContext);
});
Expand All @@ -107,14 +108,7 @@ public SQLConnection execute(String sql, Handler<AsyncResult<Void>> handler) {
public SQLConnection query(String sql, Handler<AsyncResult<ResultSet>> handler) {
beginTransactionIfNeeded(v -> {
final Future<QueryResult> future = ScalaUtils.scalaToVertx(connection.sendQuery(sql), executionContext);
future.setHandler(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture(queryResultToResultSet(ar.result())));
} else {
handler.handle(Future.failedFuture(ar.cause()));
}

});
future.setHandler(handleAsyncQueryResultToResultSet(handler));
});

return this;
Expand All @@ -125,14 +119,7 @@ public SQLConnection queryWithParams(String sql, JsonArray params, Handler<Async
beginTransactionIfNeeded(v -> {
final scala.concurrent.Future<QueryResult> future = connection.sendPreparedStatement(sql,
ScalaUtils.toScalaList(params.getList()));
future.onComplete(ScalaUtils.<QueryResult>toFunction1(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture(queryResultToResultSet(ar.result())));
} else {
handler.handle(Future.failedFuture(ar.cause()));
}

}), executionContext);
future.onComplete(ScalaUtils.toFunction1(handleAsyncQueryResultToResultSet(handler)), executionContext);
});

return this;
Expand All @@ -143,10 +130,14 @@ public SQLConnection update(String sql, Handler<AsyncResult<UpdateResult>> handl
beginTransactionIfNeeded(v -> {
final scala.concurrent.Future<QueryResult> future = connection.sendQuery(sql);
future.onComplete(ScalaUtils.<QueryResult>toFunction1(ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
if (ar.succeeded()) {
try {
handler.handle(Future.succeededFuture(queryResultToUpdateResult(ar.result())));
} catch (Throwable e) {
handler.handle(Future.failedFuture(e));
}
} else {
handler.handle(Future.succeededFuture(queryResultToUpdateResult(ar.result())));
handler.handle(Future.failedFuture(ar.cause()));
}
}), executionContext);
});
Expand All @@ -160,10 +151,10 @@ public SQLConnection updateWithParams(String sql, JsonArray params, Handler<Asyn
final scala.concurrent.Future<QueryResult> future = connection.sendPreparedStatement(sql,
ScalaUtils.toScalaList(params.getList()));
future.onComplete(ScalaUtils.<QueryResult>toFunction1(ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
try {
handler.handle(Future.succeededFuture(queryResultToUpdateResult(ar.result())));
} catch (Throwable e) {
handler.handle(Future.failedFuture(e));
}
}), executionContext);
});
Expand Down Expand Up @@ -241,6 +232,20 @@ private synchronized void beginTransactionIfNeeded(Handler<AsyncResult<Void>> ac
}
}

private Handler<AsyncResult<QueryResult>> handleAsyncQueryResultToResultSet(Handler<AsyncResult<ResultSet>> handler) {
return ar -> {
if (ar.succeeded()) {
try {
handler.handle(Future.succeededFuture(queryResultToResultSet(ar.result())));
} catch (Throwable e) {
handler.handle(Future.failedFuture(e));
}
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
};
}

private ResultSet queryResultToResultSet(QueryResult qr) {
final Option<com.github.mauricio.async.db.ResultSet> rows = qr.rows();
if (!rows.isDefined()) {
Expand Down Expand Up @@ -280,6 +285,8 @@ public Void apply(Object value) {
array.add(value.toString());
} else if (value instanceof LocalDate) {
array.add(value.toString());
} else if (value instanceof DateTime) {
array.add(value.toString());
} else if (value instanceof UUID) {
array.add(value.toString());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.github.mauricio.async.db.Configuration;
import com.github.mauricio.async.db.Connection;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/io/vertx/ext/asyncsql/AbstractTestBase.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.ext.asyncsql;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -43,4 +44,13 @@ protected void ensureSuccess(TestContext context, AsyncResult result) {
}
}

protected <A> Handler<AsyncResult<A>> onSuccess(TestContext context, Handler<A> fn) {
return ar -> {
if (ar.succeeded()) {
fn.handle(ar.result());
} else {
context.fail("Should have been a success");
}
};
}
}
1 change: 0 additions & 1 deletion src/test/java/io/vertx/ext/asyncsql/MySQLClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.vertx.core.Handler;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.ext.unit.Async;
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/io/vertx/ext/asyncsql/PostgreSQLClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package io.vertx.ext.asyncsql;

import com.github.mauricio.async.db.QueryResult;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import org.junit.Before;
Expand Down
103 changes: 61 additions & 42 deletions src/test/java/io/vertx/ext/asyncsql/PostgreSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,83 @@

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.test.core.VertxTestBase;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

/**
* @author <a href="http://www.campudus.com">Joern Bernhardt</a>.
*/
public class PostgreSQLTest extends VertxTestBase {

AsyncSQLClient asyncSqlClient;

final String address = "campudus.postgresql";
public class PostgreSQLTest extends AbstractTestBase {

final JsonObject config = new JsonObject()
.put("host", System.getProperty("db.host", "localhost"))
.put("postgresql", new JsonObject().put("address", address));
.put("host", System.getProperty("db.host", "localhost"));

@Override
public void setUp() throws Exception {
super.setUp();
asyncSqlClient = PostgreSQLClient.createNonShared(vertx, config);
@Before
public void init() {
client = PostgreSQLClient.createNonShared(vertx, config);
}

@Override
public void tearDown() throws Exception {
CountDownLatch latch;
if (this.asyncSqlClient != null) {
latch = new CountDownLatch(1);
this.asyncSqlClient.close((ar) -> {
latch.countDown();
@Test
public void someTest(TestContext context) throws Exception {
Async async = context.async();
client.getConnection(connAr -> {
ensureSuccess(context, connAr);
conn = connAr.result();
conn.query("SELECT 1 AS something", resultSetAr -> {
ensureSuccess(context, resultSetAr);
ResultSet resultSet = resultSetAr.result();
context.assertNotNull(resultSet);
context.assertNotNull(resultSet.getColumnNames());
context.assertNotNull(resultSet.getResults());
context.assertEquals(new JsonArray().add(1), resultSet.getResults().get(0));
async.complete();
});
this.awaitLatch(latch);
}

super.tearDown();
});
}

@Test
public void someTest() throws Exception {
asyncSqlClient.getConnection(onSuccess(conn -> {
conn.query("SELECT 1 AS something", onSuccess(resultSet -> {
System.out.println(resultSet.getResults());
assertNotNull(resultSet);
assertNotNull(resultSet.getColumnNames());
assertNotNull(resultSet.getResults());
assertEquals(new JsonArray().add(1), resultSet.getResults().get(0));
conn.close((ar) -> {
if (ar.succeeded()) {
testComplete();
} else {
fail("should be able to close the asyncSqlClient");
}
});
public void queryTypeTimestampWithTimezoneTest(TestContext context) throws Exception {
Async async = context.async();
client.getConnection(connAr -> {
ensureSuccess(context, connAr);
conn = connAr.result();
conn.execute("DROP TABLE IF EXISTS test_table", onSuccess(context, dropped -> {
conn.execute("CREATE TABLE IF NOT EXISTS test_table (ts timestamp with time zone)", onSuccess(context, created -> {
conn.execute("INSERT INTO test_table (ts) VALUES (now())", onSuccess(context, inserted -> {
conn.query("SELECT * FROM test_table;", onSuccess(context, timestampSelect -> {
context.assertNotNull(timestampSelect);
context.assertNotNull(timestampSelect.getResults());
context.assertTrue(timestampSelect.getResults().get(0).getString(0).matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(Z|[+-]\\d{2}:\\d{2})"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmlopes this would be the check I mentioned over there: 14bb98b#commitcomment-16058891

async.complete();
}));
}));
}));
}));
}));
});
}

await();
@Test
public void queryTypeTimestampWithoutTimezoneTest(TestContext context) throws Exception {
Async async = context.async();
client.getConnection(connAr -> {
ensureSuccess(context, connAr);
conn = connAr.result();
conn.execute("DROP TABLE IF EXISTS test_table", onSuccess(context, dropped -> {
conn.execute("CREATE TABLE IF NOT EXISTS test_table (ts timestamp without time zone)", onSuccess(context, created -> {
conn.execute("INSERT INTO test_table (ts) VALUES (now())", onSuccess(context, inserted -> {
conn.query("SELECT * FROM test_table;", onSuccess(context, timestampSelect -> {
context.assertNotNull(timestampSelect);
context.assertNotNull(timestampSelect.getResults());
context.assertTrue(timestampSelect.getResults().get(0).getString(0).matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmlopes and this checks that there is no timezone info available here.

async.complete();
}));
}));
}));
}));
});
}

}