Skip to content

Commit c935700

Browse files
Parallel transform (#11)
Parallel transform. * Added timegm.h from FreeBSD sources, enabled it for the benchmark.
1 parent d21ae3d commit c935700

File tree

4 files changed

+183
-37
lines changed

4 files changed

+183
-37
lines changed

benchmark.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <print.h>
88
#include <transform.h>
99
#include <sort.h>
10+
#include <timegm.h>
1011

1112
#include <cmath>
1213

@@ -17,7 +18,8 @@
1718
//++++++++++++++++++++++++++++++
1819
// BENCHMARK
1920
//++++++++++++++++++++++++++++++
20-
std::shared_ptr<arrow::Table> taxi1(std::shared_ptr<arrow::Table> table) {
21+
std::shared_ptr<arrow::Table>
22+
taxi1(std::shared_ptr<arrow::Table> table) {
2123
//SELECT cab_type, count(cab_type)
2224
//FROM trips
2325
//GROUP BY cab_type;
@@ -27,7 +29,8 @@ std::shared_ptr<arrow::Table> taxi1(std::shared_ptr<arrow::Table> table) {
2729
return aggregate(table, taxi1_group_by, {&taxi1_task});
2830
}
2931

30-
std::shared_ptr<arrow::Table> taxi2(std::shared_ptr<arrow::Table> table) {
32+
std::shared_ptr<arrow::Table>
33+
taxi2(std::shared_ptr<arrow::Table> table) {
3134
//SELECT passenger_count, avg(total_amount)
3235
//FROM trips
3336
//GROUP BY passenger_count;
@@ -37,23 +40,27 @@ std::shared_ptr<arrow::Table> taxi2(std::shared_ptr<arrow::Table> table) {
3740
return aggregate(table, taxi2_group_by, {&taxi2_task});
3841
}
3942

40-
std::shared_ptr<arrow::Table> taxi3(std::shared_ptr<arrow::Table> table) {
43+
std::shared_ptr<arrow::Table>
44+
taxi3(std::shared_ptr<arrow::Table> table) {
4145
//SELECT passenger_count,
4246
// EXTRACT(year from pickup_datetime) as year,
4347
// count(*)
4448
//FROM trips
4549
//GROUP BY passenger_count,
4650
// year;
4751
printf("NAME: Taxi number 3\n");
48-
auto year = [](int64_t time) { time_t tt = static_cast<time_t>(time); return int64_t(gmtime(&tt)->tm_year + 1900); }; // gmtime (not localtime) because of python
52+
auto year = [](int64_t time) {
53+
time_t tt = static_cast<time_t>(time); struct tm r;
54+
return int64_t(_der_gmtime(tt, &r)->tm_year + 1900); }; // gmtime (not localtime) because of python
4955
auto taxi3_table = transform<int64_t, int64_t, arrow::TimestampArray, arrow::Int64Builder>(table, 2, year);
5056
group *taxi3_group_by = group_by(taxi3_table, {2, 10});
5157
aggregate_task taxi3_task = {count, 0};
5258
auto a = aggregate(taxi3_table, taxi3_group_by, {&taxi3_task});
5359
return sort(a, {0, 1}, {asc, asc}, flat); // Only one chunk for sort here, not a good checking - see sortAll
5460
}
5561

56-
std::shared_ptr<arrow::Table> taxi4(std::shared_ptr<arrow::Table> table) {
62+
std::shared_ptr<arrow::Table>
63+
taxi4(std::shared_ptr<arrow::Table> table) {
5764
//SELECT passenger_count,
5865
// EXTRACT(year from pickup_datetime) as year,
5966
// round(trip_distance) distance,
@@ -65,7 +72,9 @@ std::shared_ptr<arrow::Table> taxi4(std::shared_ptr<arrow::Table> table) {
6572
//ORDER BY year,
6673
// trips desc;
6774
printf("NAME: Taxi number 4\n");
68-
auto year = [](int64_t time) { time_t tt = static_cast<time_t>(time); return int64_t(gmtime(&tt)->tm_year + 1900); }; // gmtime (not localtime) because of python
75+
auto year = [](int64_t time) {
76+
time_t tt = static_cast<time_t>(time); struct tm r;
77+
return int64_t(_der_gmtime(tt, &r)->tm_year + 1900); }; // gmtime (not localtime) because of python
6978
auto taxi4_table = transform<int64_t, int64_t, arrow::TimestampArray, arrow::Int64Builder>(table, 2, year);
7079
auto taxi4_table1 = transform<double, double, arrow::DoubleArray, arrow::DoubleBuilder>(taxi4_table, 11, round);
7180
group *taxi4_group_by = group_by(taxi4_table1, {2, 10, 11});

group_by.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct position {
6868
#if 1 //USE_TBB
6969
struct mult_group_map_t : public tbb::concurrent_hash_map<position, int> {
7070
using tbb::concurrent_hash_map<position, int>::concurrent_hash_map;
71-
#if 0
71+
#if TBB_INTERFACE_VERSION < 11007
7272
const_pointer fast_find(const position& k) {
7373
return internal_fast_find(k);
7474
}

timegm.h

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright (c) 1997 Kungliga Tekniska Högskolan
3+
* (Royal Institute of Technology, Stockholm, Sweden).
4+
* All rights reserved.
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following conditions
8+
* are met:
9+
*
10+
* 1. Redistributions of source code must retain the above copyright
11+
* notice, this list of conditions and the following disclaimer.
12+
*
13+
* 2. Redistributions in binary form must reproduce the above copyright
14+
* notice, this list of conditions and the following disclaimer in the
15+
* documentation and/or other materials provided with the distribution.
16+
*
17+
* 3. Neither the name of the Institute nor the names of its contributors
18+
* may be used to endorse or promote products derived from this software
19+
* without specific prior written permission.
20+
*
21+
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
22+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24+
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
25+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
27+
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28+
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29+
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
30+
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31+
* SUCH DAMAGE.
32+
*/
33+
34+
#include <time.h>
35+
36+
#define ASN1_MAX_YEAR 2000
37+
38+
static int
39+
is_leap(unsigned y)
40+
{
41+
y += 1900;
42+
return (y % 4) == 0 && ((y % 100) != 0 || (y % 400) == 0);
43+
}
44+
45+
static const unsigned ndays[2][12] ={
46+
{31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31},
47+
{31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}};
48+
49+
/*
50+
* This is a simplifed version of timegm(3) that doesn't accept out of
51+
* bound values that timegm(3) normally accepts but those are not
52+
* valid in asn1 encodings.
53+
*/
54+
55+
time_t
56+
_der_timegm (struct tm *tm)
57+
{
58+
time_t res = 0;
59+
int i;
60+
61+
/*
62+
* See comment in _der_gmtime
63+
*/
64+
if (tm->tm_year > ASN1_MAX_YEAR)
65+
return 0;
66+
67+
if (tm->tm_year < 0)
68+
return -1;
69+
if (tm->tm_mon < 0 || tm->tm_mon > 11)
70+
return -1;
71+
if (tm->tm_mday < 1 || tm->tm_mday > (int)ndays[is_leap(tm->tm_year)][tm->tm_mon])
72+
return -1;
73+
if (tm->tm_hour < 0 || tm->tm_hour > 23)
74+
return -1;
75+
if (tm->tm_min < 0 || tm->tm_min > 59)
76+
return -1;
77+
if (tm->tm_sec < 0 || tm->tm_sec > 59)
78+
return -1;
79+
80+
for (i = 70; i < tm->tm_year; ++i)
81+
res += is_leap(i) ? 366 : 365;
82+
83+
for (i = 0; i < tm->tm_mon; ++i)
84+
res += ndays[is_leap(tm->tm_year)][i];
85+
res += tm->tm_mday - 1;
86+
res *= 24;
87+
res += tm->tm_hour;
88+
res *= 60;
89+
res += tm->tm_min;
90+
res *= 60;
91+
res += tm->tm_sec;
92+
return res;
93+
}
94+
95+
struct tm *
96+
_der_gmtime(time_t t, struct tm *tm)
97+
{
98+
time_t secday = t % (3600 * 24);
99+
time_t days = t / (3600 * 24);
100+
101+
memset(tm, 0, sizeof(*tm));
102+
103+
tm->tm_sec = secday % 60;
104+
tm->tm_min = (secday % 3600) / 60;
105+
tm->tm_hour = secday / 3600;
106+
107+
/*
108+
* Refuse to calculate time ~ 2000 years into the future, this is
109+
* not possible for systems where time_t is a int32_t, however,
110+
* when time_t is a int64_t, that can happen, and this becomes a
111+
* denial of sevice.
112+
*/
113+
if (days > (ASN1_MAX_YEAR * 365))
114+
return NULL;
115+
116+
tm->tm_year = 70;
117+
while(1) {
118+
unsigned dayinyear = (is_leap(tm->tm_year) ? 366 : 365);
119+
if (days < dayinyear)
120+
break;
121+
tm->tm_year += 1;
122+
days -= dayinyear;
123+
}
124+
tm->tm_mon = 0;
125+
126+
while (1) {
127+
unsigned daysinmonth = ndays[is_leap(tm->tm_year)][tm->tm_mon];
128+
if (days < daysinmonth)
129+
break;
130+
days -= daysinmonth;
131+
tm->tm_mon++;
132+
}
133+
tm->tm_mday = days + 1;
134+
135+
return tm;
136+
}

transform.h

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#define TRANSFORM_H
33

44
#include <arrow/api.h>
5-
5+
#include <tbb/tbb.h>
66
#include <print.h>
77

88
//++++++++++++++++++++++++++++++
@@ -12,35 +12,36 @@
1212
// One column tranformation per time. Should aggregation be the same? No vector of tasks?
1313
template <typename T_c, typename T_n, typename T2, typename T4>
1414
std::shared_ptr<arrow::Table> transform(std::shared_ptr<arrow::Table> table, int column_id, T_n (*transformation)(T_c)) {
15-
printf("TASK: transforming of column\n");
16-
auto begin = std::chrono::steady_clock::now();
17-
18-
auto column = table->column(column_id)->data();
19-
arrow::ArrayVector new_chunks(column->num_chunks());
20-
std::shared_ptr<arrow::Array> data;
21-
for (int i = 0; i < column->num_chunks(); i++) {
22-
T4 bld; // template
23-
auto c = std::dynamic_pointer_cast<T2>(column->chunk(i)); // template
24-
if (c == NULL) {
25-
printf("Type of %d column is wrong!\n", column_id + 1);
26-
}
27-
// TBB in parallel for all available chunks or sequential for each incoming chunk
28-
for (int j = 0; j < c->length(); j++) {
29-
auto value = c->Value(j);
30-
T_n new_value = transformation(value);
31-
bld.Append(new_value); // resize?
32-
}
33-
bld.Finish(&data);
34-
new_chunks[i] = data;
35-
}
36-
37-
std::shared_ptr<arrow::Field> new_field = std::make_shared<arrow::Field>("name", data->type());
38-
std::shared_ptr<arrow::Column> new_column = std::make_shared<arrow::Column>(new_field, new_chunks);
39-
std::shared_ptr<arrow::Table> new_table;
40-
table->SetColumn(column_id, new_column, &new_table);
41-
42-
print_time(begin);
43-
return new_table;
15+
printf("TASK: transforming of column\n");
16+
auto begin = std::chrono::steady_clock::now();
17+
const auto column = table->column(column_id)->data();
18+
arrow::ArrayVector new_chunks(column->num_chunks());
19+
printf("Parallel: %d, chunk[0]->length: %d\n", column->num_chunks(), column->chunk(0)->length());
20+
tbb::parallel_for(0, column->num_chunks(), [&new_chunks, &column, column_id, transformation](int i){
21+
//for(int i = column->num_chunks()-1; i >= 0; i--){
22+
auto c = std::dynamic_pointer_cast<T2>(column->chunk(i)); // template
23+
if (c == NULL) {
24+
printf("Type of %d column is wrong!\n", column_id + 1); abort();
25+
}
26+
T4 bld; // template
27+
bld.Resize(c->length());
28+
for (int j = 0; j < c->length(); j++) {
29+
auto value = c->Value(j);
30+
auto new_value = transformation(value);
31+
bld.Append(new_value); // resize?
32+
}
33+
std::shared_ptr<arrow::Array> data;
34+
bld.Finish(&data);
35+
new_chunks[i] = data;
36+
});
37+
38+
std::shared_ptr<arrow::Field> new_field = std::make_shared<arrow::Field>("name", new_chunks[0]->type());
39+
std::shared_ptr<arrow::Column> new_column = std::make_shared<arrow::Column>(new_field, new_chunks);
40+
std::shared_ptr<arrow::Table> new_table;
41+
table->SetColumn(column_id, new_column, &new_table);
42+
43+
print_time(begin);
44+
return new_table;
4445
}
4546

4647
#endif

0 commit comments

Comments
 (0)