Skip to content

Commit 0fb0dae

Browse files
Krishna Kondakafacebook-github-bot
authored andcommitted
Add support to replace the AccessPoint in ProxyDestination
Summary: Add support to replace the AccessPoint in ProxyDestination 1. Use ProxyMessage to send message to Proxy thread to do the actual change of access point in Proxy thread context and not in config thread context. The Proxy object does the actual replacement of the access point by finding the ProxyDestination in the ProxyDestinationMap using the temporary old access point constructed from the IP address that is replaced. 2. Put the new destination in TKO mode (because the new host won't be ready anyways and so pro-actively put the destination in TKO mode) 3. replace shared_ptr<AccessPoint> in ProxyDestination with folly::atomic_shared_ptr<AccessPoint> Description of the AP replacement logic: ``` caller calls Proxy->replaceAccessPoint() with temp old AP constructed from old IP address/port send a message to Proxy thread to find and replace old AP ``` Proxy thread upon receiving the replace AP message ``` calls processReplaceMessage( tmpOldAP, newAP) which calls ProxyDestinationMap::replace(tmpOldAP, newAP) This function uses tmpOldAP to find the realOldAP in the Proxy DestinationMap If it is found, it is erased first because the MAP used AP internally to identify the destination Then replaces the old AP with new AP and inserts the destination back into the MAP which internally uses new AP to correctly place it in the MAP Finally returns the old AP so that it map be used by the caller ``` Reviewed By: stuclar Differential Revision: D25416686 fbshipit-source-id: 7ad38e21dd60811f75d56903c0d6a029363c459d
1 parent 11c3e90 commit 0fb0dae

12 files changed

+197
-33
lines changed

mcrouter/Proxy-inl.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ void Proxy<RouterInfo>::messageReady(ProxyMessage::Type t, void* data) {
314314
delete oldConfig;
315315
} break;
316316

317+
case ProxyMessage::Type::REPLACE_AP: {
318+
auto rep = reinterpret_cast<replace_ap_t*>(data);
319+
processReplaceMessage(rep);
320+
rep->baton.post();
321+
} break;
322+
317323
case ProxyMessage::Type::SHUTDOWN:
318324
/*
319325
* No-op. We just wanted to wake this event base up so that
@@ -422,6 +428,41 @@ bool Proxy<RouterInfo>::messageQueueFull() const noexcept {
422428
return messageQueue_->isFull();
423429
}
424430

431+
template <class RouterInfo>
432+
void Proxy<RouterInfo>::processReplaceMessage(replace_ap_t* rep) {
433+
rep->replacedAccessPoint = nullptr;
434+
if (rep->oldAccessPoint.getProtocol() == mc_thrift_protocol) {
435+
rep->replacedAccessPoint =
436+
destinationMap_->replace<ThriftTransport<RouterInfo>>(
437+
rep->oldAccessPoint,
438+
rep->newAccessPoint,
439+
std::chrono::milliseconds(0));
440+
} else if (rep->oldAccessPoint.getProtocol() == mc_caret_protocol) {
441+
rep->replacedAccessPoint = destinationMap_->replace<AsyncMcClient>(
442+
rep->oldAccessPoint, rep->newAccessPoint, std::chrono::milliseconds(0));
443+
}
444+
}
445+
446+
template <class RouterInfo>
447+
std::shared_ptr<const AccessPoint> Proxy<RouterInfo>::replaceAP(
448+
const AccessPoint& oldAP,
449+
std::shared_ptr<const AccessPoint> newAP) {
450+
// Send message to replace AP in the ProxyDestination
451+
try {
452+
auto repMsg = std::make_unique<replace_ap_t>(oldAP, newAP);
453+
sendMessage(ProxyMessage::Type::REPLACE_AP, repMsg.get());
454+
// wait for the response
455+
repMsg->baton.wait();
456+
auto replacedAP = repMsg->replacedAccessPoint;
457+
VLOG(2) << ((replacedAP != nullptr) ? "SUCCESS " : "FAILED ")
458+
<< " Replacing oldAP = " << oldAP.toString()
459+
<< " newAP = " << newAP->toString();
460+
return replacedAP;
461+
} catch (...) {
462+
return nullptr;
463+
}
464+
}
465+
425466
} // namespace mcrouter
426467
} // namespace memcache
427468
} // namespace facebook

mcrouter/Proxy.h

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
#include "mcrouter/lib/carbon/Keys.h"
3434
#include "mcrouter/lib/mc/msg.h"
3535
#include "mcrouter/lib/mc/protocol.h"
36+
#include "mcrouter/lib/network/AsyncMcClient.h"
3637
#include "mcrouter/lib/network/CarbonMessageList.h"
38+
#include "mcrouter/lib/network/ThriftTransport.h"
3739
#include "mcrouter/lib/network/UniqueIntrusiveList.h"
3840
#include "mcrouter/options.h"
3941

@@ -63,7 +65,7 @@ class ProxyRequestContextTyped;
6365
class ShardSplitter;
6466

6567
struct ProxyMessage {
66-
enum class Type { REQUEST, OLD_CONFIG, SHUTDOWN };
68+
enum class Type { REQUEST, OLD_CONFIG, REPLACE_AP, SHUTDOWN };
6769

6870
Type type{Type::REQUEST};
6971
void* data{nullptr};
@@ -73,6 +75,35 @@ struct ProxyMessage {
7375
ProxyMessage(Type t, void* d) noexcept : type(t), data(d) {}
7476
};
7577

78+
// struct used for replace message
79+
//
80+
struct replace_ap_t {
81+
explicit replace_ap_t(
82+
const AccessPoint& oldAp,
83+
std::shared_ptr<const AccessPoint> newAp)
84+
: oldAccessPoint(oldAp), newAccessPoint(std::move(newAp)) {}
85+
86+
// oldAccessPoint contains IP+port+other AP related info of the host that just
87+
// got removed
88+
AccessPoint oldAccessPoint;
89+
// newAccessPoint contains IP+port+other AP related info of the host that just
90+
// got added
91+
std::shared_ptr<const AccessPoint> newAccessPoint;
92+
// replacedAccessPoint
93+
// is the RETURN value from replace command which contains the
94+
// actual access point corresponding to the removed host
95+
// which is in the internal data structures like
96+
// ProxyDestination, ProxyDestinationMap, AccessPoints set in
97+
// ProxyConfig.
98+
// Since this is the actual AP present in the internal data
99+
// structures, we need to find this using the oldAccessPoint
100+
// and querying ProxyDestinationMap
101+
std::shared_ptr<const AccessPoint> replacedAccessPoint;
102+
103+
// baton to wait on for the completion of the message processing
104+
folly::fibers::Baton baton;
105+
};
106+
76107
template <class RouterInfo>
77108
class Proxy : public ProxyBase {
78109
public:
@@ -131,6 +162,25 @@ class Proxy : public ProxyBase {
131162
return requestStats_.dump(filterZeroes);
132163
}
133164

165+
void processReplaceMessage(replace_ap_t* rep);
166+
167+
// replaceAP
168+
// @oldAP temporary AP constructed from IP address of the host
169+
// that got replaced
170+
// @newAP new AP that is used to replace oldAP in the ProxyDestination and
171+
// ProxyDestinationMap
172+
//
173+
// Returns
174+
// replacedAP AP in ProxyDestinationMap corresponding to the host that got
175+
// replaced, this is the real AP to be replaced. oldAP is used
176+
// to find this real AP
177+
// Replace message is sent to proxy thread to find the AP to be replaced
178+
// using 'oldAP' and then the AP is replaced by newAP and the replaced AP
179+
// is returned to caller
180+
std::shared_ptr<const AccessPoint> replaceAP(
181+
const AccessPoint& oldAP,
182+
std::shared_ptr<const AccessPoint> newAP);
183+
134184
void advanceRequestStatsBin() override {
135185
requestStats().advanceBin();
136186
}

mcrouter/ProxyConfig.h

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,27 @@ class ProxyConfig {
7979
return it->second.first;
8080
}
8181

82-
const folly::StringKeyedUnorderedMap<
83-
std::vector<std::shared_ptr<const AccessPoint>>>&
84-
getAccessPoints() const {
82+
bool updateAccessPoints(
83+
const std::string& pool,
84+
std::shared_ptr<const AccessPoint>& oldAccessPoint,
85+
std::shared_ptr<const AccessPoint>& newAccessPoint) {
86+
auto it = accessPoints_.find(pool);
87+
if (it != accessPoints_.end()) {
88+
auto apIt = it->second.find(oldAccessPoint);
89+
if (apIt != it->second.end()) {
90+
it->second.erase(apIt);
91+
} else {
92+
return false;
93+
}
94+
it->second.insert(newAccessPoint);
95+
return true;
96+
}
97+
return false;
98+
}
99+
100+
folly::StringKeyedUnorderedMap<
101+
std::unordered_set<std::shared_ptr<const AccessPoint>>>&
102+
getAccessPoints() {
85103
return accessPoints_;
86104
}
87105

@@ -92,7 +110,7 @@ class ProxyConfig {
92110
// config (after all RouteHandles) because its keys are being referenced
93111
// by object in the Config.
94112
folly::StringKeyedUnorderedMap<
95-
std::vector<std::shared_ptr<const AccessPoint>>>
113+
std::unordered_set<std::shared_ptr<const AccessPoint>>>
96114
accessPoints_;
97115

98116
// pool source name -> (allow_partial_reconfig, [(pool_config,[pool_names])])

mcrouter/ProxyDestination.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ class ProxyDestination : public ProxyDestinationBase {
8686
return selfPtr_;
8787
}
8888

89+
std::shared_ptr<const AccessPoint> replaceAP(
90+
std::shared_ptr<const AccessPoint> newAccessPoint) {
91+
auto ret = ProxyDestinationBase::replaceAP(newAccessPoint);
92+
closeGracefully();
93+
return ret;
94+
}
95+
8996
private:
9097
std::unique_ptr<Transport, typename Transport::Destructor> transport_;
9198
// Ensure proxy thread doesn't reset the Transport

mcrouter/ProxyDestinationBase.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ bool ProxyDestinationBase::maySend(carbon::Result& tkoReason) const {
114114
void ProxyDestinationBase::onTkoEvent(TkoLogEvent event, carbon::Result result)
115115
const {
116116
auto logUtil = [this, result](folly::StringPiece eventStr) {
117-
VLOG(2) << accessPoint_->toHostPortString() << " " << eventStr
117+
VLOG(2) << accessPoint_.load()->toHostPortString() << " " << eventStr
118118
<< ". Total hard TKOs: " << tracker_->globalTkos().hardTkos
119119
<< "; soft TKOs: " << tracker_->globalTkos().softTkos
120120
<< ". Reply: " << carbon::resultToString(result);
@@ -135,7 +135,7 @@ void ProxyDestinationBase::onTkoEvent(TkoLogEvent event, carbon::Result result)
135135
break;
136136
}
137137

138-
TkoLog tkoLog(*accessPoint_, tracker_->globalTkos());
138+
TkoLog tkoLog(*accessPoint_.load(), tracker_->globalTkos());
139139
tkoLog.event = event;
140140
tkoLog.isHardTko = tracker_->isHardTko();
141141
tkoLog.isSoftTko = tracker_->isSoftTko();

mcrouter/ProxyDestinationBase.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <memory>
1313

1414
#include <folly/IntrusiveList.h>
15+
#include <folly/concurrency/AtomicSharedPtr.h>
1516

1617
#include "mcrouter/ExponentialSmoothData.h"
1718
#include "mcrouter/TkoLog.h"
@@ -97,8 +98,8 @@ class ProxyDestinationBase {
9798
/**
9899
* Destination host that this client talks to.
99100
*/
100-
const std::shared_ptr<const AccessPoint>& accessPoint() const {
101-
return accessPoint_;
101+
const std::shared_ptr<const AccessPoint> accessPoint() const {
102+
return accessPoint_.load();
102103
}
103104

104105
/**
@@ -174,12 +175,17 @@ class ProxyDestinationBase {
174175
return probeInflight_;
175176
}
176177

178+
std::shared_ptr<const AccessPoint> replaceAP(
179+
std::shared_ptr<const AccessPoint> newAccessPoint) {
180+
return accessPoint_.exchange(newAccessPoint);
181+
}
182+
177183
private:
178184
ProxyBase& proxy_;
179185
std::shared_ptr<TkoTracker> tracker_;
180186

181187
// Destination host information
182-
const std::shared_ptr<const AccessPoint> accessPoint_;
188+
folly::atomic_shared_ptr<const AccessPoint> accessPoint_;
183189
std::chrono::milliseconds shortestConnectTimeout_{0};
184190
std::chrono::milliseconds shortestWriteTimeout_{0};
185191
const uint32_t qosClass_{0};

mcrouter/ProxyDestinationKey.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ ProxyDestinationKey::ProxyDestinationKey(
2222
std::chrono::milliseconds timeout_)
2323
: accessPoint(ap), timeout(timeout_) {}
2424

25+
ProxyDestinationKey::ProxyDestinationKey(const AccessPoint& ap)
26+
: ProxyDestinationKey(ap, std::chrono::milliseconds{0}) {}
27+
2528
std::string ProxyDestinationKey::str() const {
2629
if (accessPoint.getProtocol() == mc_ascii_protocol) {
2730
// we cannot send requests with different timeouts for ASCII, since

mcrouter/ProxyDestinationKey.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct ProxyDestinationKey {
2525

2626
explicit ProxyDestinationKey(const ProxyDestinationBase& dst);
2727
ProxyDestinationKey(const AccessPoint& ap, std::chrono::milliseconds timeout);
28+
explicit ProxyDestinationKey(const AccessPoint& ap);
2829

2930
std::string str() const;
3031
size_t hash() const;

mcrouter/ProxyDestinationMap-inl.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,33 @@ namespace facebook {
2020
namespace memcache {
2121
namespace mcrouter {
2222

23+
template <class Transport>
24+
std::shared_ptr<const AccessPoint> ProxyDestinationMap::replace(
25+
const AccessPoint& tmpAccessPoint,
26+
std::shared_ptr<const AccessPoint> newAccessPoint,
27+
const std::chrono::milliseconds timeout) {
28+
// Find the destination with old access point using
29+
// temp access point. If it is found, the old access point is replaced
30+
// with new access point
31+
std::lock_guard<std::mutex> lck(destinationsLock_);
32+
auto it = destinations_.find(ProxyDestinationKey(tmpAccessPoint, timeout));
33+
if (it != destinations_.end()) {
34+
auto destination = std::dynamic_pointer_cast<ProxyDestination<Transport>>(
35+
(*it)->selfPtr().lock());
36+
// Erase the destination before the access point is replaced because
37+
// the internal logic to identify the destination inside destinations_
38+
// uses access point info as Key
39+
destinations_.erase(destination.get());
40+
auto oldAccessPoint = destination->replaceAP(newAccessPoint);
41+
// After the access point is replaced, insert the destination back into
42+
// destinations with new access point used as Key
43+
destinations_.emplace(destination.get());
44+
// force the destination in TKO state
45+
destination->handleTko(carbon::Result::CONNECT_ERROR, false);
46+
return oldAccessPoint;
47+
}
48+
return nullptr;
49+
}
2350
template <class Transport>
2451
std::shared_ptr<ProxyDestination<Transport>> ProxyDestinationMap::emplace(
2552
std::shared_ptr<AccessPoint> ap,

mcrouter/ProxyDestinationMap.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ class ProxyDestinationMap {
6868
uint32_t qosPath,
6969
const std::shared_ptr<PoolTkoTracker>& poolTkoTracker);
7070

71+
template <class Transport>
72+
std::shared_ptr<const AccessPoint> replace(
73+
const AccessPoint& tmpOldAccessPoint,
74+
std::shared_ptr<const AccessPoint> newAccessPoint,
75+
std::chrono::milliseconds timeout);
76+
7177
std::shared_ptr<PoolTkoTracker> createPoolTkoTracker(
7278
std::string poolName,
7379
uint32_t numEnterSoftTkos,

0 commit comments

Comments
 (0)