From 2cdbbf316ffee688a031ffe7fe477918a90302f7 Mon Sep 17 00:00:00 2001 From: David Murphy Date: Mon, 14 Dec 2015 17:23:19 -0600 Subject: [PATCH 1/3] Inital Rebase of toku2mongo for 2.02 --- src/mongo/CMakeLists.txt | 3 + src/mongo/db/gtid.cpp | 23 +- src/mongo/db/gtid.h | 4 + src/mongo/tools/toku2mongo.cpp | 391 +++++++++++++++++++++++++++++++++ 4 files changed, 419 insertions(+), 2 deletions(-) create mode 100644 src/mongo/tools/toku2mongo.cpp diff --git a/src/mongo/CMakeLists.txt b/src/mongo/CMakeLists.txt index d3a52d45c9..d3e322990d 100644 --- a/src/mongo/CMakeLists.txt +++ b/src/mongo/CMakeLists.txt @@ -593,6 +593,7 @@ foreach (tool add_executable(mongo${tool} tools/${tool}) endforeach () add_executable(bsondump tools/bsondump) +add_executable(toku2mongo tools/toku2mongo) foreach (tool mongodump mongorestore @@ -604,6 +605,7 @@ foreach (tool mongofiles mongobridge bsondump + toku2mongo ) add_dependencies(${tool} generate_error_codes generate_action_types install_tdb_h) if (NOT APPLE) @@ -627,6 +629,7 @@ install_executables(tokumx_tools mongo2toku mongofiles bsondump + toku2mongo ) target_link_libraries(mongofiles gridfs) diff --git a/src/mongo/db/gtid.cpp b/src/mongo/db/gtid.cpp index 50508cabfb..f4f20e5d13 100644 --- a/src/mongo/db/gtid.cpp +++ b/src/mongo/db/gtid.cpp @@ -18,6 +18,7 @@ #include "mongo/db/gtid.h" +#include "mongo/base/parse_number.h" #include "mongo/bson/util/builder.h" #include "mongo/util/time_support.h" @@ -87,11 +88,29 @@ namespace mongo { bool GTID::isInitial() const { return (_primarySeqNo == 0); } + + std::string GTID::toConciseString() const { + std::stringstream ss; + ss << _primarySeqNo << ":" << _GTSeqNo; + return ss.str(); + } + Status GTID::parseConciseString(const StringData &s, GTID >id) { + size_t colonPos = s.find(":"); + StringData primaryStr = s.substr(0, colonPos); + StringData seqStr = s.substr(colonPos + 1); + Status status = parseNumberFromString(primaryStr, >id._primarySeqNo); + if (!status.isOK()) { + return status; + } + status = parseNumberFromString(seqStr, >id._GTSeqNo); + return status; + } + uint64_t GTID::getPrimary() const { return _primarySeqNo; - } - + } + GTIDManager::GTIDManager( GTID lastGTID, uint64_t lastTime, uint64_t lastHash, uint32_t id, uint64_t lastVotedForPrimary ) { _selfID = id; _lastLiveGTID = lastGTID; diff --git a/src/mongo/db/gtid.h b/src/mongo/db/gtid.h index a9192cda56..8887f752fe 100644 --- a/src/mongo/db/gtid.h +++ b/src/mongo/db/gtid.h @@ -20,6 +20,8 @@ //#include "mongo/db/jsobj.h" #include +#include "mongo/base/status.h" + namespace mongo { class BSONObjBuilder; @@ -42,6 +44,8 @@ namespace mongo { void setPrimaryTo(uint64_t newPrimary); string toString() const; bool isInitial() const; + static Status parseConciseString(const StringData &s, GTID >id); + std::string toConciseString() const; uint64_t getPrimary() const; bool operator==(const GTID& other) const { return _primarySeqNo == other._primarySeqNo && _GTSeqNo == other._GTSeqNo; diff --git a/src/mongo/tools/toku2mongo.cpp b/src/mongo/tools/toku2mongo.cpp new file mode 100644 index 0000000000..8e443b55a9 --- /dev/null +++ b/src/mongo/tools/toku2mongo.cpp @@ -0,0 +1,391 @@ +// toku2mongo.cpp +/** +* Copyright (C) 2014 Tokutek Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "mongo/pch.h" + +#include "mongo/tools/tool.h" + +#include "mongo/base/initializer.h" +#include "mongo/base/string_data.h" +#include "mongo/bson/util/misc.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/client.h" +#include "mongo/db/gtid.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/namespacestring.h" +#include "mongo/db/oplogreader.h" +#include "mongo/db/oplog_field_names.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/password.h" + +#include +#include +#include +#include + +#include +#include + +using namespace mongo; + +namespace po = boost::program_options; + +class TokuOplogTool : public Tool { + bool applyOps(BSONElement opsArray) { + verify(opsArray.type() == mongo::Array); + for (BSONObjIterator it(opsArray.Obj()); it.more(); ++it) { + const BSONObj op = (*it).Obj(); + const StringData type = op[KEY_STR_OP_NAME].Stringdata(); + if (type == OP_STR_COMMENT) { + // noop + continue; + } + if (op[KEY_STR_NS].type() != String) { + error() << "invalid ns in op " << op << endl; + return false; + } + const char *ns = op[KEY_STR_NS].valuestrsafe(); + if (type == OP_STR_INSERT || type == OP_STR_CAPPED_INSERT) { + const BSONObj obj = op[KEY_STR_ROW].Obj(); + _conn->insert(ns, obj); + } else if (type == OP_STR_UPDATE) { + const BSONObj oldObj = op[KEY_STR_OLD_ROW].Obj(); + const BSONObj newObj = op[KEY_STR_NEW_ROW].Obj(); + _conn->remove(ns, oldObj, true); + _conn->insert(ns, newObj); + } else if (type == OP_STR_UPDATE_ROW_WITH_MOD) { + const BSONObj pk = op[KEY_STR_PK].Obj(); + const BSONObj mods = op[KEY_STR_MODS].Obj(); + _conn->update(ns, pk, mods); + } else if (type == OP_STR_DELETE || type == OP_STR_CAPPED_DELETE) { + const BSONObj obj = op[KEY_STR_ROW].Obj(); + _conn->remove(ns, obj); + } else if (type == OP_STR_COMMAND) { + const BSONObj cmd = op[KEY_STR_ROW].Obj(); + if (nsToCollectionSubstring(ns) != "$cmd") { + error() << "invalid command op " << op << endl; + return false; + } + BSONObj info = _conn->findOne(ns, cmd); + if (!info["ok"].trueValue()) { + error() << "error replaying command op " << op << ": " << info << endl; + return false; + } + } else { + error() << "unrecognized type \"" << type << "\" in op " << op << endl; + return false; + } + + std::string lastErr = _conn->getLastError(nsToDatabase(ns), false, false, _w); + if (!lastErr.empty()) { + error() << lastErr << endl; + return false; + } + } + return true; + } + + int _run() { + if ( ! hasParam( "from" ) ) { + log() << "need to specify --from" << endl; + return -1; + } + + if (currentClient.get() == 0) { + Client::initThread( "toku2mongo" ); + } + + LOG(1) << "going to connect" << endl; + + OplogReader r(false); + r.connect( getParam( "from" ) ); + + if (hasParam("ruser")) { + if (!hasParam("rpass")) { + log() << "if using auth on source, must specify both --ruser and --rpass" << endl; + return -1; + } + try { + r.conn()->auth(BSON("user" << getParam("ruser") << + "userSource" << _rauthenticationDatabase << + "pwd" << _rpass << + "mechanism" << _authenticationMechanism)); + } catch (DBException &e) { + if (e.getCode() == ErrorCodes::AuthenticationFailed) { + error() << "error authenticating to " << _rauthenticationDatabase << " on source: " + << e.what() << endl; + return -1; + } + throw; + } + } + + LOG(1) << "connected" << endl; + + { + string gtidString; + if (hasParam("gtid")) { + gtidString = getParam("gtid"); + } else { + try { + ifstream gtidFile; + gtidFile.exceptions(std::ifstream::badbit | std::ifstream::failbit); + gtidFile.open(_gtidFilename); + gtidFile >> gtidString; + gtidFile.close(); + } catch (std::exception &e) { + warning() << "Couldn't read GTID from file " << _gtidFilename << ": " << e.what() << endl; + } + } + if (gtidString.empty()) { + error() << "No starting GTID provided. " + << "Please find the right starting point and run again with --gtid." << endl; + return -1; + } + Status s = GTID::parseConciseString(gtidString, _maxGTIDSynced); + if (!s.isOK()) { + error() << "error parsing GTID " << gtidString << ": " << s.reason() << endl; + return -1; + } + } + + try { + while (running) { + r.tailCheck(); + bool needsGTIDCheck = false; + if (!r.haveCursor()) { + r.tailingQueryGTE(_oplogns.c_str(), _maxGTIDSynced); + needsGTIDCheck = true; + } + if (!r.haveCursor()) { + error() << "oplog tailing query failed for unknown reason" << endl; + return -1; + } + + if (_reportingTimer.seconds() >= _reportingPeriod) { + report(r); + } + + while (running && r.more()) { + BSONObj o = r.nextSafe(); + LOG(2) << o << endl; + + if (needsGTIDCheck) { + GTID gtid = getGTIDFromBSON("_id", o); + if (GTID::cmp(gtid, _maxGTIDSynced) > 0) { + error() << "Wanted to start from GTID " << _maxGTIDSynced.toConciseString() + << " but could only find " << gtid.toConciseString() << "." << endl; + error() << "You appear to have fallen too far behind in replication," + << " the destination needs to start from scratch." << endl; + return -1; + } else if (GTID::cmp(gtid, _maxGTIDSynced) < 0) { + error() << "Wanted to start from GTID " << _maxGTIDSynced.toConciseString() + << " but we found " << gtid.toConciseString() + << ", which is earlier." << endl; + error() << "This seems like a query bug, please report it." << endl; + return -1; + } + needsGTIDCheck = false; + // Skip the first op, we believe we've already applied it. + continue; + } + + if (o.hasElement("ref")) { + OID oid = o["ref"].OID(); + long long seq = 0; + for (shared_ptr refsCursor = r.getOplogRefsCursor(oid); refsCursor->more(); ) { + BSONObj refsObj = refsCursor->nextSafe(); + if (refsObj.getFieldDotted("_id.oid").OID() != oid) { + break; + } + long long thisSeq = refsObj.getFieldDotted("_id.seq").Long(); + if (thisSeq < seq + 1) { + error() << "broken sequence of refs entries, last seq was " << seq << " but next object found was " << refsObj << endl; + return -1; + } + seq = thisSeq; + if (!applyOps(refsObj["ops"])) { + return -1; + } + + if (_reportingTimer.seconds() >= _reportingPeriod) { + report(r); + } + } + } else if (o.hasElement("ops")) { + if (!applyOps(o["ops"])) { + return -1; + } + } else { + error() << "invalid oplog entry " << o << endl; + return -1; + } + + _maxGTIDSynced = getGTIDFromBSON("_id", o); + _maxTimestampSynced = o["ts"].Date(); + + if (_reportingTimer.seconds() >= _reportingPeriod) { + report(r); + } + } + } + } catch (DBException &e) { + error() << "Caught exception " << e.what() << ". Exiting..." << endl; + return -1; + } catch (...) { + error() << "Caught unknown exception, Exiting..." << endl; + return -1; + } + + return 0; + } + + void report(OplogReader &rdr) const { + Nullstream& l = log(); + l << "synced up to " << _maxGTIDSynced.toConciseString() + << " (" << time_t_to_String_short(_maxTimestampSynced.toTimeT()) << ")"; + _reportingTimer.reset(); + BSONObj lastObj = rdr.getLastOp(_oplogns.c_str()); + GTID lastGTID = getGTIDFromBSON("_id", lastObj); + Date_t lastDate = lastObj["ts"].Date(); + l << ", source has up to " << lastGTID.toConciseString() + << " (" << time_t_to_String_short(lastDate.toTimeT()) << ")"; + if (GTID::cmp(lastGTID, _maxGTIDSynced) == 0) { + l << ", fully synced." << endl; + } + else { + int64_t diff = lastDate.millis - _maxTimestampSynced.millis; + if (diff > 1000) { + l << ", " << (diff / 1000) << " seconds behind source." << endl; + } + else { + l << ", less than 1 second behind source." << endl; + } + } + } + + string _oplogns; + string _rpass; + string _rauthenticationDatabase; + string _rauthenticationMechanism; + GTID _maxGTIDSynced; + Date_t _maxTimestampSynced; + int _w; + int _reportingPeriod; + mutable Timer _reportingTimer; + static const char *_gtidFilename; + +public: + TokuOplogTool() : Tool( "2mongo" ) { + add_options() + ("gtid" , po::value() , "max applied GTID" ) + ("w", po::value(&_w)->default_value(1), "w parameter for getLastError calls") + ("from", po::value() , "host to pull from" ) + ("ruser", po::value(), "username on source host if auth required" ) + ("rpass", new PasswordValue( &_rpass ), "password on source host" ) + ("rauthenticationDatabase", + po::value(&_rauthenticationDatabase)->default_value("admin"), + "user source on source host (defaults to \"admin\")" ) + ("rauthenticationMechanism", + po::value(&_rauthenticationMechanism)->default_value("MONGODB-CR"), + "authentication mechanism on source host") + ("oplogns", po::value(&_oplogns)->default_value( "local.oplog.rs" ) , "ns to pull from" ) + ("reportingPeriod", po::value(&_reportingPeriod)->default_value(10) , "seconds between progress reports" ) + ; + } + + virtual void printExtraHelp(ostream& out) { + out << "Pull and replay a remote TokuMX oplog.\n" << endl; + } + + static volatile bool running; + + void logPosition() { + if (GTID::cmp(_maxGTIDSynced, GTID()) != 0) { + std::string gtidString = _maxGTIDSynced.toConciseString(); + log() << "Exiting while processing GTID " << gtidString << endl; + log() << "Use --gtid=" << gtidString << " to resume." << endl; + try { + std::ofstream gtidFile; + gtidFile.exceptions(std::ofstream::badbit | std::ofstream::failbit); + gtidFile.open(_gtidFilename, std::ofstream::trunc); + gtidFile << gtidString; + gtidFile.close(); + log() << "Saved GTID to file " + << (boost::filesystem::current_path() / _gtidFilename).string() << "." << endl; + log() << "I'll automatically use this value next time if you run from this directory " + << "and don't pass --gtid." << endl; + } catch (std::exception &e) { + warning() << "Error saving GTID to file " << _gtidFilename << ": " << e.what() << endl; + warning() << "Make sure you save the GTID somewhere, because I couldn't!" << endl; + } + } + } + + int run() { + int ret = _run(); + logPosition(); + return ret; + } +}; + +const char *TokuOplogTool::_gtidFilename = "__toku2mongo_saved_timestamp__"; + +volatile bool TokuOplogTool::running = false; + +namespace proc_mgmt { + + TokuOplogTool *theTool = NULL; + + static void fatal_handler(int sig) { + signal(sig, SIG_DFL); + log() << "Received signal " << sig << "." << endl; + warning() << "Dying immediately on fatal signal." << endl; + if (theTool != NULL) { + theTool->logPosition(); + } + ::abort(); + } + static void exit_handler(int sig) { + signal(sig, SIG_DFL); + log() << "Received signal " << sig << "." << endl; + log() << "Will exit soon." << endl; + TokuOplogTool::running = false; + } + +} + +int main( int argc , char** argv, char **envp ) { + mongo::runGlobalInitializersOrDie(argc, argv, envp); + TokuOplogTool t; + t.running = true; + proc_mgmt::theTool = &t; + signal(SIGILL, proc_mgmt::fatal_handler); + signal(SIGABRT, proc_mgmt::fatal_handler); + signal(SIGFPE, proc_mgmt::fatal_handler); + signal(SIGSEGV, proc_mgmt::fatal_handler); + signal(SIGHUP, proc_mgmt::exit_handler); + signal(SIGINT, proc_mgmt::exit_handler); + signal(SIGQUIT, proc_mgmt::exit_handler); + signal(SIGPIPE, proc_mgmt::exit_handler); + signal(SIGALRM, proc_mgmt::exit_handler); + signal(SIGTERM, proc_mgmt::exit_handler); + signal(SIGUSR1, SIG_IGN); + signal(SIGUSR2, SIG_IGN); + return t.main( argc , argv ); +} From 3a098370a029747c8188234681655297dfabd9db Mon Sep 17 00:00:00 2001 From: David Murphy Date: Mon, 14 Dec 2015 17:33:13 -0600 Subject: [PATCH 2/3] Added jsmet's patch for remap the fact tokumx using the primary key to a unique identifier and mongo use _id --- esmet.patch | 36 ++++++++++++++++++++++++++++++++++ src/mongo/tools/toku2mongo.cpp | 16 +++++++++++++-- 2 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 esmet.patch diff --git a/esmet.patch b/esmet.patch new file mode 100644 index 0000000000..79eb16e4d6 --- /dev/null +++ b/esmet.patch @@ -0,0 +1,36 @@ +diff --git a/src/mongo/tools/toku2mongo.cpp b/src/mongo/tools/toku2mongo.cpp +index 8e443b5..4ca658f 100644 +--- a/src/mongo/tools/toku2mongo.cpp ++++ b/src/mongo/tools/toku2mongo.cpp +@@ -46,6 +46,18 @@ using namespace mongo; + namespace po = boost::program_options; + + class TokuOplogTool : public Tool { ++ ++ BSONObj primaryKeyToIdKey(const BSONObj &pk) { ++ /* The last field of any primary key _must_ be the _id field. */ ++ BSONElement e; ++ for (BSONObjIterator it(pk); it.more(); ++it) { ++ e = *it; ++ } ++ BSONObj o = BSON("_id" << e); ++ log() << "primaryKeyToIdKey: pk = " << pk << ", o = " << o << endl; ++ return o; ++ } ++ + bool applyOps(BSONElement opsArray) { + verify(opsArray.type() == mongo::Array); + for (BSONObjIterator it(opsArray.Obj()); it.more(); ++it) { +@@ -69,9 +81,9 @@ class TokuOplogTool : public Tool { + _conn->remove(ns, oldObj, true); + _conn->insert(ns, newObj); + } else if (type == OP_STR_UPDATE_ROW_WITH_MOD) { +- const BSONObj pk = op[KEY_STR_PK].Obj(); ++ const BSONObj id = primaryKeyToIdKey(op[KEY_STR_PK].Obj()); + const BSONObj mods = op[KEY_STR_MODS].Obj(); +- _conn->update(ns, pk, mods); ++ _conn->update(ns, id, mods); + } else if (type == OP_STR_DELETE || type == OP_STR_CAPPED_DELETE) { + const BSONObj obj = op[KEY_STR_ROW].Obj(); + _conn->remove(ns, obj); + diff --git a/src/mongo/tools/toku2mongo.cpp b/src/mongo/tools/toku2mongo.cpp index 8e443b55a9..4ca658fa90 100644 --- a/src/mongo/tools/toku2mongo.cpp +++ b/src/mongo/tools/toku2mongo.cpp @@ -46,6 +46,18 @@ using namespace mongo; namespace po = boost::program_options; class TokuOplogTool : public Tool { + + BSONObj primaryKeyToIdKey(const BSONObj &pk) { + /* The last field of any primary key _must_ be the _id field. */ + BSONElement e; + for (BSONObjIterator it(pk); it.more(); ++it) { + e = *it; + } + BSONObj o = BSON("_id" << e); + log() << "primaryKeyToIdKey: pk = " << pk << ", o = " << o << endl; + return o; + } + bool applyOps(BSONElement opsArray) { verify(opsArray.type() == mongo::Array); for (BSONObjIterator it(opsArray.Obj()); it.more(); ++it) { @@ -69,9 +81,9 @@ class TokuOplogTool : public Tool { _conn->remove(ns, oldObj, true); _conn->insert(ns, newObj); } else if (type == OP_STR_UPDATE_ROW_WITH_MOD) { - const BSONObj pk = op[KEY_STR_PK].Obj(); + const BSONObj id = primaryKeyToIdKey(op[KEY_STR_PK].Obj()); const BSONObj mods = op[KEY_STR_MODS].Obj(); - _conn->update(ns, pk, mods); + _conn->update(ns, id, mods); } else if (type == OP_STR_DELETE || type == OP_STR_CAPPED_DELETE) { const BSONObj obj = op[KEY_STR_ROW].Obj(); _conn->remove(ns, obj); From 8c1cbb02e69902b8d431373999e6260af18bc932 Mon Sep 17 00:00:00 2001 From: David Murphy Date: Mon, 14 Dec 2015 17:36:59 -0600 Subject: [PATCH 3/3] Removing patch file that should not have been included :( --- esmet.patch | 36 ------------------------------------ 1 file changed, 36 deletions(-) delete mode 100644 esmet.patch diff --git a/esmet.patch b/esmet.patch deleted file mode 100644 index 79eb16e4d6..0000000000 --- a/esmet.patch +++ /dev/null @@ -1,36 +0,0 @@ -diff --git a/src/mongo/tools/toku2mongo.cpp b/src/mongo/tools/toku2mongo.cpp -index 8e443b5..4ca658f 100644 ---- a/src/mongo/tools/toku2mongo.cpp -+++ b/src/mongo/tools/toku2mongo.cpp -@@ -46,6 +46,18 @@ using namespace mongo; - namespace po = boost::program_options; - - class TokuOplogTool : public Tool { -+ -+ BSONObj primaryKeyToIdKey(const BSONObj &pk) { -+ /* The last field of any primary key _must_ be the _id field. */ -+ BSONElement e; -+ for (BSONObjIterator it(pk); it.more(); ++it) { -+ e = *it; -+ } -+ BSONObj o = BSON("_id" << e); -+ log() << "primaryKeyToIdKey: pk = " << pk << ", o = " << o << endl; -+ return o; -+ } -+ - bool applyOps(BSONElement opsArray) { - verify(opsArray.type() == mongo::Array); - for (BSONObjIterator it(opsArray.Obj()); it.more(); ++it) { -@@ -69,9 +81,9 @@ class TokuOplogTool : public Tool { - _conn->remove(ns, oldObj, true); - _conn->insert(ns, newObj); - } else if (type == OP_STR_UPDATE_ROW_WITH_MOD) { -- const BSONObj pk = op[KEY_STR_PK].Obj(); -+ const BSONObj id = primaryKeyToIdKey(op[KEY_STR_PK].Obj()); - const BSONObj mods = op[KEY_STR_MODS].Obj(); -- _conn->update(ns, pk, mods); -+ _conn->update(ns, id, mods); - } else if (type == OP_STR_DELETE || type == OP_STR_CAPPED_DELETE) { - const BSONObj obj = op[KEY_STR_ROW].Obj(); - _conn->remove(ns, obj); -