Skip to content

Commit b203e54

Browse files
authored
Merge pull request #914 from lonvia/db-copy-class
Introduce separate COPY threads
2 parents 0275c1a + b89aea3 commit b203e54

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1411
-1257
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ if (NOT HAVE_UNISTD_H AND NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/unistd.h)
168168
endif()
169169

170170
set(osm2pgsql_lib_SOURCES
171+
db-copy.cpp
171172
expire-tiles.cpp
172173
geometry-processor.cpp
173174
id-tracker.cpp
@@ -197,6 +198,7 @@ set(osm2pgsql_lib_SOURCES
197198
tagtransform-c.cpp
198199
util.cpp
199200
wildcmp.cpp
201+
db-copy.hpp
200202
expire-tiles.hpp
201203
geometry-processor.hpp
202204
id-tracker.hpp

db-copy.cpp

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
#include <boost/format.hpp>
2+
#include <cassert>
3+
#include <cstdio>
4+
#include <future>
5+
#include <thread>
6+
7+
#include "db-copy.hpp"
8+
#include "pgsql.hpp"
9+
10+
using fmt = boost::format;
11+
12+
db_copy_thread_t::db_copy_thread_t(std::string const &conninfo)
13+
: m_conninfo(conninfo), m_conn(nullptr)
14+
{
15+
m_worker = std::thread([this]() {
16+
try {
17+
worker_thread();
18+
} catch (std::runtime_error const &e) {
19+
fprintf(stderr, "DB writer thread failed due to ERROR: %s\n",
20+
e.what());
21+
exit(2);
22+
}
23+
});
24+
}
25+
26+
db_copy_thread_t::~db_copy_thread_t() { finish(); }
27+
28+
void db_copy_thread_t::add_buffer(std::unique_ptr<db_cmd_t> &&buffer)
29+
{
30+
assert(m_worker.joinable()); // thread must not have been finished
31+
std::unique_lock<std::mutex> lock(m_queue_mutex);
32+
m_worker_queue.push_back(std::move(buffer));
33+
m_queue_cond.notify_one();
34+
}
35+
36+
void db_copy_thread_t::sync_and_wait()
37+
{
38+
std::promise<void> barrier;
39+
std::future<void> sync = barrier.get_future();
40+
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_sync_t(std::move(barrier))));
41+
sync.wait();
42+
}
43+
44+
void db_copy_thread_t::finish()
45+
{
46+
if (m_worker.joinable()) {
47+
finish_copy();
48+
49+
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_finish_t()));
50+
m_worker.join();
51+
}
52+
}
53+
54+
void db_copy_thread_t::worker_thread()
55+
{
56+
connect();
57+
58+
bool done = false;
59+
while (!done) {
60+
std::unique_ptr<db_cmd_t> item;
61+
{
62+
std::unique_lock<std::mutex> lock(m_queue_mutex);
63+
if (m_worker_queue.empty()) {
64+
m_queue_cond.wait(lock);
65+
continue;
66+
}
67+
68+
item = std::move(m_worker_queue.front());
69+
m_worker_queue.pop_front();
70+
}
71+
72+
switch (item->type) {
73+
case db_cmd_t::Cmd_copy:
74+
write_to_db(static_cast<db_cmd_copy_t *>(item.get()));
75+
break;
76+
case db_cmd_t::Cmd_sync:
77+
finish_copy();
78+
static_cast<db_cmd_sync_t *>(item.get())->barrier.set_value();
79+
break;
80+
case db_cmd_t::Cmd_finish:
81+
done = true;
82+
break;
83+
}
84+
}
85+
86+
finish_copy();
87+
88+
disconnect();
89+
}
90+
91+
void db_copy_thread_t::connect()
92+
{
93+
assert(!m_conn);
94+
95+
PGconn *conn = PQconnectdb(m_conninfo.c_str());
96+
if (PQstatus(conn) != CONNECTION_OK)
97+
throw std::runtime_error(
98+
(fmt("Connection to database failed: %1%\n") % PQerrorMessage(conn))
99+
.str());
100+
m_conn = conn;
101+
102+
// Let commits happen faster by delaying when they actually occur.
103+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK,
104+
"SET synchronous_commit TO off;");
105+
}
106+
107+
void db_copy_thread_t::disconnect()
108+
{
109+
if (!m_conn)
110+
return;
111+
112+
PQfinish(m_conn);
113+
m_conn = nullptr;
114+
}
115+
116+
void db_copy_thread_t::write_to_db(db_cmd_copy_t *buffer)
117+
{
118+
if (!buffer->deletables.empty() ||
119+
(m_inflight && !buffer->target->same_copy_target(*m_inflight.get())))
120+
finish_copy();
121+
122+
if (!buffer->deletables.empty())
123+
delete_rows(buffer);
124+
125+
if (!m_inflight)
126+
start_copy(buffer->target);
127+
128+
pgsql_CopyData(buffer->target->name.c_str(), m_conn, buffer->buffer);
129+
}
130+
131+
void db_copy_thread_t::delete_rows(db_cmd_copy_t *buffer)
132+
{
133+
assert(!m_inflight);
134+
135+
std::string sql = "DELETE FROM ";
136+
sql.reserve(buffer->target->name.size() + buffer->deletables.size() * 15 +
137+
30);
138+
sql += buffer->target->name;
139+
sql += " WHERE ";
140+
sql += buffer->target->id;
141+
sql += " IN (";
142+
for (auto id : buffer->deletables) {
143+
sql += std::to_string(id);
144+
sql += ',';
145+
}
146+
sql[sql.size() - 1] = ')';
147+
148+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql);
149+
}
150+
151+
void db_copy_thread_t::start_copy(std::shared_ptr<db_target_descr_t> const &target)
152+
{
153+
m_inflight = target;
154+
155+
std::string copystr = "COPY ";
156+
copystr.reserve(target->name.size() + target->rows.size() + 14);
157+
copystr += target->name;
158+
if (!target->rows.empty()) {
159+
copystr += '(';
160+
copystr += target->rows;
161+
copystr += ')';
162+
}
163+
copystr += " FROM STDIN";
164+
pgsql_exec_simple(m_conn, PGRES_COPY_IN, copystr);
165+
166+
m_inflight = target;
167+
}
168+
169+
void db_copy_thread_t::finish_copy()
170+
{
171+
if (!m_inflight)
172+
return;
173+
174+
if (PQputCopyEnd(m_conn, nullptr) != 1)
175+
throw std::runtime_error((fmt("stop COPY_END for %1% failed: %2%\n") %
176+
m_inflight->name %
177+
PQerrorMessage(m_conn))
178+
.str());
179+
180+
pg_result_t res(PQgetResult(m_conn));
181+
if (PQresultStatus(res.get()) != PGRES_COMMAND_OK)
182+
throw std::runtime_error((fmt("result COPY_END for %1% failed: %2%\n") %
183+
m_inflight->name %
184+
PQerrorMessage(m_conn))
185+
.str());
186+
187+
m_inflight.reset();
188+
}
189+
190+
db_copy_mgr_t::db_copy_mgr_t(std::shared_ptr<db_copy_thread_t> const &processor)
191+
: m_processor(processor)
192+
{}
193+
194+
void db_copy_mgr_t::new_line(std::shared_ptr<db_target_descr_t> const &table)
195+
{
196+
if (!m_current || !m_current->target->same_copy_target(*table.get())) {
197+
if (m_current) {
198+
m_processor->add_buffer(std::move(m_current));
199+
}
200+
201+
m_current.reset(new db_cmd_copy_t(table));
202+
}
203+
}
204+
205+
void db_copy_mgr_t::delete_id(osmid_t osm_id)
206+
{
207+
assert(m_current);
208+
m_current->deletables.push_back(osm_id);
209+
}
210+
211+
void db_copy_mgr_t::sync()
212+
{
213+
// finish any ongoing copy operations
214+
if (m_current) {
215+
m_processor->add_buffer(std::move(m_current));
216+
}
217+
218+
m_processor->sync_and_wait();
219+
}
220+
221+
void db_copy_mgr_t::finish_line()
222+
{
223+
assert(m_current);
224+
225+
auto &buf = m_current->buffer;
226+
assert(!buf.empty());
227+
228+
// Expect that a column has been written last which ended in a '\t'.
229+
// Replace it with the row delimiter '\n'.
230+
auto sz = buf.size();
231+
assert(buf[sz - 1] == '\t');
232+
buf[sz - 1] = '\n';
233+
234+
if (sz > db_cmd_copy_t::Max_buf_size - 100) {
235+
m_processor->add_buffer(std::move(m_current));
236+
}
237+
}
238+

0 commit comments

Comments
 (0)