@@ -1721,7 +1721,7 @@ optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa
1721
1721
trans->StartMultiGlobal (dbid);
1722
1722
return true ;
1723
1723
case Transaction::LOCK_AHEAD:
1724
- trans->StartMultiLockedAhead (dbid, keys);
1724
+ trans->StartMultiLockedAhead (dbid, keys, true );
1725
1725
return true ;
1726
1726
case Transaction::NON_ATOMIC:
1727
1727
trans->StartMultiNonAtomic ();
@@ -1762,26 +1762,6 @@ std::pair<const CommandId*, CmdArgList> Service::FindCmd(CmdArgList args) const
1762
1762
return {res, args.subspan (1 )};
1763
1763
}
1764
1764
1765
- static bool CanRunSingleShardMulti (optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
1766
- const Transaction& tx) {
1767
- if (!sid.has_value ()) {
1768
- return false ;
1769
- }
1770
-
1771
- if (DetermineMultiMode (params) != Transaction::LOCK_AHEAD) {
1772
- return false ;
1773
- }
1774
-
1775
- if (tx.GetMultiMode () != Transaction::NOT_DETERMINED) {
1776
- // We may be running EVAL under MULTI. Currently RunSingleShardMulti() will attempt to lock
1777
- // keys, in which case will be already locked by MULTI. We could optimize this path as well
1778
- // though.
1779
- return false ;
1780
- }
1781
-
1782
- return true ;
1783
- }
1784
-
1785
1765
void Service::EvalInternal (CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
1786
1766
ConnectionContext* cntx) {
1787
1767
DCHECK (!eval_args.sha .empty ());
@@ -1806,20 +1786,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
1806
1786
sinfo = make_unique<ConnectionState::ScriptInfo>();
1807
1787
sinfo->keys .reserve (eval_args.keys .size ());
1808
1788
1809
- optional<ShardId> sid;
1810
- for (size_t i = 0 ; i < eval_args.keys .size (); ++i) {
1811
- string_view key = ArgS (eval_args.keys , i);
1812
- sinfo->keys .insert (KeyLockArgs::GetLockKey (key));
1813
-
1814
- ShardId cur_sid = Shard (key, shard_count ());
1815
- if (i == 0 ) {
1816
- sid = cur_sid;
1817
- }
1818
- if (sid.has_value () && *sid != cur_sid) {
1819
- sid = nullopt;
1820
- }
1821
- }
1822
-
1823
1789
sinfo->async_cmds_heap_limit = absl::GetFlag (FLAGS_multi_eval_squash_buffer);
1824
1790
Transaction* tx = cntx->transaction ;
1825
1791
CHECK (tx != nullptr );
@@ -1834,19 +1800,29 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
1834
1800
1835
1801
Interpreter::RunResult result;
1836
1802
1837
- if (CanRunSingleShardMulti (sid, *params, *tx)) {
1838
- // If script runs on a single shard, we run it remotely to save hops.
1803
+ optional<bool > scheduled = StartMultiEval (cntx->db_index (), eval_args.keys , *params, cntx);
1804
+ if (!scheduled) {
1805
+ return ;
1806
+ }
1807
+
1808
+ // If script runs on a single shard, we run it remotely to save hops
1809
+ if (!tx->IsScheduled () && tx->GetMultiMode () == Transaction::LOCK_AHEAD &&
1810
+ tx->GetUniqueShardCnt () == 1 ) {
1811
+ DCHECK (*scheduled); // because tx multi mode is lock ahead
1812
+ CHECK (!tx->IsScheduled ()); // skip_scheduling = true in StartMultiEval
1813
+
1839
1814
interpreter->SetRedisFunc ([cntx, tx, this ](Interpreter::CallArgs args) {
1840
1815
// Disable squashing, as we're using the squashing mechanism to run remotely.
1841
1816
args.async = false ;
1842
1817
CallFromScript (cntx, args);
1843
1818
});
1844
1819
1845
1820
++ServerState::tlocal ()->stats .eval_shardlocal_coordination_cnt ;
1846
- // TODO: remove doubule key iteration
1847
- tx->PrepareMultiForScheduleSingleHop (*sid, tx->GetDbIndex (), args);
1821
+
1822
+ auto sid = tx->GetUniqueShard ();
1823
+ tx->MultiBecomeSquasher ();
1848
1824
tx->ScheduleSingleHop ([&](Transaction*, EngineShard*) {
1849
- boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, * sid};
1825
+ boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, sid};
1850
1826
stub_tx->MultiUpdateWithParent (tx);
1851
1827
cntx->transaction = stub_tx.get ();
1852
1828
@@ -1856,16 +1832,14 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
1856
1832
return OpStatus::OK;
1857
1833
});
1858
1834
1859
- if (* sid != ServerState::tlocal ()->thread_index ()) {
1835
+ if (sid != ServerState::tlocal ()->thread_index ()) {
1860
1836
VLOG (1 ) << " Migrating connection " << cntx->conn () << " from "
1861
- << ProactorBase::me ()->GetPoolIndex () << " to " << * sid;
1862
- cntx->conn ()->RequestAsyncMigration (shard_set->pool ()->at (* sid));
1837
+ << ProactorBase::me ()->GetPoolIndex () << " to " << sid;
1838
+ cntx->conn ()->RequestAsyncMigration (shard_set->pool ()->at (sid));
1863
1839
}
1864
1840
} else {
1865
- optional<bool > scheduled = StartMultiEval (cntx->db_index (), eval_args.keys , *params, cntx);
1866
- if (!scheduled) {
1867
- return ;
1868
- }
1841
+ if (*scheduled && !tx->IsScheduled ())
1842
+ tx->Schedule ();
1869
1843
1870
1844
++ServerState::tlocal ()->stats .eval_io_coordination_cnt ;
1871
1845
interpreter->SetRedisFunc (
@@ -2080,6 +2054,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
2080
2054
if (scheduled && allow_squashing) {
2081
2055
MultiCommandSquasher::Execute (absl::MakeSpan (exec_info.body ), cntx, this );
2082
2056
} else {
2057
+ DCHECK (!scheduled || !delay_scheduling);
2083
2058
for (auto & scmd : exec_info.body ) {
2084
2059
VLOG (2 ) << " TX CMD " << scmd.Cid ()->name () << " " << scmd.NumArgs ();
2085
2060
0 commit comments