Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pull request for until expert #1

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
/.vscode/
/.idea/
repeat-until.md
4 changes: 2 additions & 2 deletions base/type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,11 @@ obinstream& operator>>(obinstream& m, MSG_T& type);

enum class EXPERT_T : char {
INIT, AGGREGATE, AS, BRANCH, BRANCHFILTER, CAP, CONFIG, COUNT, DEDUP, GROUP, HAS, HASLABEL, INDEX,
IS, KEY, LABEL, MATH, ORDER, PROPERTY, RANGE, SELECT, TRAVERSAL, VALUES, WHERE, COIN, REPEAT, END
IS, KEY, LABEL, MATH, ORDER, PROPERTY, RANGE, SELECT, TRAVERSAL, VALUES, WHERE, COIN, REPEAT, UNTIL, END
};

static const char *ExpertType[] = { "INIT", "AGGREGATE", "AS", "BRANCH", "BRANCHFILTER", "CAP", "CONFIG", "COUNT", "DEDUP", "GROUP", "HAS",
"HASLABEL", "INDEX", "IS", "KEY", "LABEL", "MATH", "ORDER", "PROPERTY", "RANGE", "SELECT", "TRAVERSAL", "VALUES", "WHERE" , "COIN", "REPEAT", "END"};
"HASLABEL", "INDEX", "IS", "KEY", "LABEL", "MATH", "ORDER", "PROPERTY", "RANGE", "SELECT", "TRAVERSAL", "VALUES", "WHERE" , "COIN", "REPEAT", "UNTIL", "END"};

ibinstream& operator<<(ibinstream& m, const EXPERT_T& type);

Expand Down
1 change: 1 addition & 0 deletions core/experts_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ExpertAdapter {
experts_[EXPERT_T::TRAVERSAL] = unique_ptr<AbstractExpert>(new TraversalExpert(id ++, data_store_, num_thread_, mailbox_, core_affinity_));
experts_[EXPERT_T::VALUES] = unique_ptr<AbstractExpert>(new ValuesExpert(id ++, data_store_, node_.get_local_rank(), num_thread_, mailbox_, core_affinity_));
experts_[EXPERT_T::WHERE] = unique_ptr<AbstractExpert>(new WhereExpert(id ++, data_store_, num_thread_, mailbox_, core_affinity_));
experts_[EXPERT_T::UNTIL] = unique_ptr<AbstractExpert>(new UntilExpert(id ++, data_store_, num_thread_, mailbox_, core_affinity_));
// TODO(future) add more

timer::init_timers((experts_.size() + timer_offset) * num_thread_);
Expand Down
30 changes: 29 additions & 1 deletion core/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,28 @@ void Message::dispatch_data(Meta& m, const vector<Expert_Object>& experts, vecto
}
}

for (auto& item : id2data) {
/*
* convert the id2data to a vector
* if the message is until spawn message
* for each data point following any history create a message for it
* because his_index can only store one history which is created before until_spawn expert
*/
vector<pair<int, vector<pair<history_t, vector<value_t>>>>> id2data_vec;
if (experts[this->meta.step].expert_type == EXPERT_T::UNTIL && this->meta.msg_type == MSG_T::SPAWN) {
for (auto& item : id2data) {
for (auto& data_pair : item.second) {
vector<pair<history_t, vector<value_t>>> point_vec;
point_vec.push_back(move(data_pair));
id2data_vec.push_back(move(make_pair(item.first, move(point_vec))));
}
}
} else {
for (auto& item : id2data) {
id2data_vec.push_back(move(make_pair(item.first, move(item.second))));
}
}

for (auto& item : id2data_vec) { // use id2data vector instead of id2data
// insert data to msg
do {
Message msg(m);
Expand All @@ -357,6 +378,10 @@ void Message::dispatch_data(Meta& m, const vector<Expert_Object>& experts, vecto
// if(! route_assigned){
msg.meta.recver_tid = core_affinity->GetThreadIdForExpert(experts[m.step].expert_type);
// }

if (experts[this->meta.step].expert_type == EXPERT_T::UNTIL && this->meta.msg_type == MSG_T::SPAWN)
msg.meta.his_index = item.second[0].first.size() - 1; // save the index of shadow history and the last one is the data until insert

msg.InsertData(item.second);
vec.push_back(move(msg));
} while ((item.second.size() != 0)); // Data no consumed
Expand Down Expand Up @@ -401,6 +426,9 @@ bool Message::update_route(Meta& m, const vector<Expert_Object>& experts) {
m.branch_infos.pop_back();

return update_route(m, experts);
} else if (experts[this->meta.step].expert_type == EXPERT_T::UNTIL) {
m.msg_type = MSG_T::SPAWN;
return false;
} else {
// aggregate labelled branch experts to parent machine
m.recver_nid = m.branch_infos[branch_depth].node_id;
Expand Down
3 changes: 3 additions & 0 deletions core/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct Meta {
// experts chain
vector<Expert_Object> experts;

//store history length before until spawn function
int his_index;

std::string DebugString() const;
};

Expand Down
56 changes: 46 additions & 10 deletions core/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ string Parser::StepToStr(int step) {
step_str_map[GROUP] = "GROUP"; step_str_map[GROUPCOUNT] = "GROUPCOUNT"; step_str_map[HAS] = "HAS"; step_str_map[HASLABEL] = "HASLABEL"; step_str_map[HASKEY] = "HASKEY"; step_str_map[HASVALUE] = "HASVALUE"; step_str_map[HASNOT] = "HASNOT"; step_str_map[IS] = "IS"; step_str_map[KEY] = "KEY"; step_str_map[LABEL] = "LABEL"; step_str_map[LIMIT] = "LIMIT"; step_str_map[MAX] = "MAX";
step_str_map[MEAN] = "MEAN"; step_str_map[MIN] = "MIN"; step_str_map[NOT] = "NOT"; step_str_map[OR] = "OR"; step_str_map[ORDER] = "ORDER"; step_str_map[PROPERTIES] = "PROPERTIES"; step_str_map[RANGE] = "RANGE"; step_str_map[SELECT] = "SELECT"; step_str_map[SKIP] = "SKIP"; step_str_map[SUM] = "SUM"; step_str_map[UNION] = "UNION"; step_str_map[VALUES] = "VALUES"; step_str_map[WHERE] = "WHERE"; step_str_map[COIN] = "COIN"; step_str_map[REPEAT] = "REPEAT";

step_str_map[UNTIL] = "UNTIL";
return step_str_map[step];
}

Expand All @@ -334,6 +335,7 @@ void Parser::Clear() {
str2se.clear();
min_count_ = -1; // max of uint64_t
first_in_sub_ = 0;
repeat_sub_first_idx = -1;
}

void Parser::AppendExpert(Expert_Object& expert) {
Expand Down Expand Up @@ -607,6 +609,9 @@ void Parser::ParseSteps(const vector<pair<Step_T, string>>& tokens) {
// Where Expert
case WHERE:
ParseWhere(params); break;
// Until Expert
case UNTIL:
ParseUntil(params); break;
default:throw ParserException("Unexpected step");
}
}
Expand Down Expand Up @@ -1303,18 +1308,26 @@ void Parser::ParseCoin(const vector<string>& params) {
}

void Parser::ParseRepeat(const vector<string>& params) {
// @ Act just as union
Expert_Object expert(EXPERT_T::REPEAT);
// Expert_Object expert(EXPERT_T::BRANCH);
if (params.size() < 1) {
throw ParserException("expect at least one parameter for branch");
// @RepeatExpert: Not a physical expert
if (params.size() != 1) {
throw ParserException("expect exactly one parameter for repeat");
} else if (params[0].find("repeat") != string::npos) {
throw ParserException("do not support nested repeat for now");
} else if (repeat_sub_first_idx != -1) {
throw ParserException("do not support nested repeat for now");
}

int current = experts_.size();
AppendExpert(expert);
repeat_sub_first_idx = experts_.size();

IO_T current_type = io_type_;

// Parse sub query
ParseSub(params, current, false);
// Parse sub-query and add to experts_ list
DoParse(params[0]);

// check if sub-query's i_type and o_type match
if (current_type != io_type_) {
throw ParserException("expect same input type and output type in sub query of repeat");
}
}

void Parser::ParseSelect(const vector<string>& params) {
Expand Down Expand Up @@ -1417,6 +1430,28 @@ void Parser::ParseTraversal(const vector<string>& params, Step_T type) {
io_type_ = (outType == Element_T::EDGE) ? IO_T::EDGE : IO_T::VERTEX;
}

void Parser::ParseUntil(const vector<string>& params) {
// @UntilExpert params: (int next_repeat_pos)
// i_type = E/V, o_type = according to sub-query's o_type
if (params.size() != 1) {
throw ParserException("expect exactly one parameter for until");
} else if (params[0].find("repeat") != string::npos) {
throw ParserException("do not support nested repeat for now");
} else if (repeat_sub_first_idx == -1) {
throw ParserException("no matching repeat found for until");
}

Expert_Object expert(EXPERT_T::UNTIL);
expert.AddParam(repeat_sub_first_idx);

int current = experts_.size();
AppendExpert(expert);

// Parse sub query
ParseSub(params, current, true);
repeat_sub_first_idx = -1;
}

void Parser::ParseValues(const vector<string>& params) {
// @ValuesExpert params: (Element_t type, int pid...)
// i_type = VERTX/EDGE, o_type = according to pid
Expand Down Expand Up @@ -1535,7 +1570,8 @@ const map<string, Parser::Step_T> Parser::str2step = {
{ "values", VALUES },
{ "where", WHERE },
{ "coin", COIN },
{ "repeat", REPEAT }
{ "repeat", REPEAT },
{ "until": UNTIL }
};

const map<string, Predicate_T> Parser::str2pred = {
Expand Down
6 changes: 5 additions & 1 deletion core/parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Parser {
enum Step_T {
IN, OUT, BOTH, INE, OUTE, BOTHE, INV, OUTV, BOTHV, AND, AGGREGATE, AS, CAP, COUNT, DEDUP,
GROUP, GROUPCOUNT, HAS, HASLABEL, HASKEY, HASVALUE, HASNOT, IS, KEY, LABEL, LIMIT, MAX,
MEAN, MIN, NOT, OR, ORDER, PROPERTIES, RANGE, SELECT, SKIP, SUM, UNION, VALUES, WHERE, COIN, REPEAT
MEAN, MIN, NOT, OR, ORDER, PROPERTIES, RANGE, SELECT, SKIP, SUM, UNION, VALUES, WHERE, COIN, REPEAT, UNTIL
};

// for debug usage
Expand Down Expand Up @@ -72,6 +72,9 @@ class Parser {
IndexStore * index_store_;
string_index * indexes_;

// the index of first expert of repeat sub-query
int repeat_sub_first_idx;

// IO type checking
bool IsNumber();
bool IsValue(uint8_t& type);
Expand Down Expand Up @@ -151,6 +154,7 @@ class Parser {
void ParseTraversal(const vector<string>& params, Step_T type);
void ParseValues(const vector<string>& params);
void ParseWhere(const vector<string>& params);
void ParseUntil(const vector<string>& params);

public:
// Parse query string
Expand Down
Loading