Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions demo/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ struct client_ssl_functor_t
cql::cql_client_t::cql_log_callback_t _log_callback;
};

static bool terminate = false;

static void* workThread( void* args ) {
cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args;

while ( !terminate ) {
// execute a query, select all rows from the keyspace
boost::shared_future<cql::cql_future_result_t> future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE);

// wait for the query to execute
future.wait();

// check whether the query succeeded
std::cout << "select successfull? " << (!future.get().error.is_err() ? "true" : "false") << std::endl;

if (future.get().error.is_err()) {
// Don't let this disappear in the log messages.
std::cout << "CATASTROPHIC ERROR: " << future.get().error.message << std::endl;
exit(1);
}

if (future.get().result) {
// print the rows return by the successful query
print_rows(*future.get().result);
}
}

return 0;
}


int
main(int argc,
char**)
Expand Down Expand Up @@ -142,18 +173,28 @@ main(int argc,
// Construct the pool
std::auto_ptr<cql::cql_client_pool_t> pool(cql::cql_client_pool_factory_t::create_client_pool_t(client_factory, NULL, NULL));

// Add a client to the pool, this operation returns a future.
boost::shared_future<cql::cql_future_connection_t> connect_future = pool->add_client("localhost", 9042);
const int numthreads = 3;

// TODO: What yields the best performance if the number of work threads is larger that
// that of available backends? One connection per backend in the pool or enouch
// identical connections to serve all client threads?

// Wait until the connection is complete, or has failed.
connect_future.wait();
for (int i = 0; i < numthreads; i++) {
// Add a client to the pool, this operation returns a future.
boost::shared_future<cql::cql_future_connection_t> connect_future = pool->add_client("localhost", 9042);

// Wait until the connection is complete, or has failed.
connect_future.wait();

// Check whether or not the connection was successful.
std::cout << "connect successfull? ";
if (!connect_future.get().error.is_err()) {
// The connection succeeded
std::cout << "TRUE" << std::endl;
// Check whether or not the connection was successful.
std::cout << "connect successfull? ";
if (!connect_future.get().error.is_err()) {
// The connection succeeded
std::cout << "TRUE" << std::endl;
}
}

if (pool->size() > 0) {
// execute a query, switch keyspaces
boost::shared_future<cql::cql_future_result_t> future = pool->query("USE system;", cql::CQL_CONSISTENCY_ONE);

Expand All @@ -176,6 +217,27 @@ main(int argc,
print_rows(*future.get().result);
}

// More of the same from a multithreaded context.
pthread_t pids[numthreads];

for (int i = 0; i < numthreads; i++) {
pthread_t pid;
pthread_create( &pid, NULL, workThread, (void*) (pool.get()) );
pids[i] = pid;
}

// Let the threads run for a bit.
sleep (5);

terminate = true;
std::cout << "WRAPPING UP" << std::endl;

// give all threads the chance to finish
for (int i = 0; i < numthreads; i++) {
void *status;
pthread_join(pids[i], &status);
}

// close the connection pool
pool->close();
}
Expand Down
7 changes: 7 additions & 0 deletions include/libcql/internal/cql_callback_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ namespace cql {
void
set_allocated()
{
// Something went badly wrong if the object is already allocated.
assert(e.next_free.index != -2);
e.next_free.index = -2;
}
};

typedef entry_t<value_t> array_entry_t;
array_entry_t* array;
int32_t next_free_index;
boost::mutex _mutex;
public:
explicit
small_indexed_storage(uint16_t size) :
Expand All @@ -116,6 +119,8 @@ namespace cql {
allocate()
{
int32_t result;
boost::mutex::scoped_lock lock(_mutex);

if ( (result = next_free_index) >= 0) {
if (array[next_free_index].next_free_cnt() > 0) {
array[++next_free_index].set_next_free(array[result].next_free_index(), array[result].next_free_cnt()-1);
Expand All @@ -133,6 +138,8 @@ namespace cql {
void
release(int32_t index)
{
boost::mutex::scoped_lock lock(_mutex);

array[index].set_next_free(next_free_index);
next_free_index = index;
}
Expand Down