Skip to content

Commit

Permalink
fix: better default timestamp generating (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Jul 3, 2020
1 parent 66ed5d0 commit 722c6a5
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 43 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Changelog

## Version 3.3.0 (in progress)
- [FIX] More precice default timestamp generating, up to microseconds

## Version 3.2.0 (2020-06-09)
- [NEW] Added possibility to read data from InfluxDB using Flux queries
- [NEW] `timeSync` utility function for synchronous time synchronization using NTP
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ InfluxDB client for Arduino can write data in batches. A batch is simply a set o
If using batch writes, the timestamp should be employed. Timestamp specifies the time where data was gathered and it is used in the form of a number of seconds (milliseconds, etc) from epoch (1.1.1970) UTC.
If points have no timestamp assigned, InfluxDB assigns timestamp at the time of writing, which could happen much later than the data has been obtained, because final batch write will happen when the batch is full (or when [flush buffer](#buffer-handling-and-retrying) is forced).
InfuxDB allows sending timestamp in various precisions - nanoseconds, microseconds, milliseconds or seconds. The milliseconds precision is usually enough for using on Arduino.
InfuxDB allows sending timestamp in various precisions - nanoseconds, microseconds, milliseconds or seconds. The milliseconds precision is usually enough for using on Arduino. Maximum avavailable precision is microseconds. Setting to nanosecond will just add zeroes for microseconds fraction.
The client has to be configured with time precision. The default settings is not using the timestamp. The `setWriteOptions` functions allow setting various parameters and one of them is __write precision__:
``` cpp
Expand All @@ -164,8 +164,10 @@ When a write precision is configured, the client will automatically assign curre

If you want to manage timestamp on your own, there are several ways how to set timestamp explicitly.
- `setTime(WritePrecision writePrecision)` - Sets timestamp to actual time in desired precision
- `setTime(unsigned long seconds)` - Sets timestamp in seconds since epoch. Write precision must be set to `S`
- `setTime(String timestamp)` - Set custom timestamp in precision specified in InfluxDBClient.
- `setTime(unsigned long long timestamp)` - Sets timestamp in an offset since epoch. Correct precision must be set InfluxDBClient::setWriteOptions.
- `setTime(String timestamp)` - Sets timestamp in an offset since epoch. Correct precision must be set InfluxDBClient::setWriteOptions.

The `getTime()` method allows copying timestamp between points.


### Configure Time
Expand Down
26 changes: 12 additions & 14 deletions src/InfluxDbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,30 @@ String Point::toLineProtocol() const {
}

void Point::setTime(WritePrecision precision) {
static char buff[10];
time_t now = time(nullptr);
struct timeval tv;
gettimeofday(&tv, NULL);

switch(precision) {
case WritePrecision::NS:
sprintf(buff, "%06d000", micros()%1000000uL);
_timestamp = String(now) + buff;
setTime(getTimeStamp(&tv,9));
break;
case WritePrecision::US:
sprintf(buff, "%06d", micros()%1000000uL);
_timestamp = String(now) + buff;
setTime(getTimeStamp(&tv,6));
break;
case WritePrecision::MS:
sprintf(buff, "%03d", millis()%1000u);
_timestamp = String(now) + buff;
case WritePrecision::MS:
setTime(getTimeStamp(&tv,3));
break;
case WritePrecision::S:
setTime(getTimeStamp(&tv,0));
break;
case WritePrecision::NoTime:
_timestamp = "";
break;
case WritePrecision::S:
_timestamp = String(now);
break;
}
}

void Point::setTime(unsigned long timestamp) {
_timestamp = String(timestamp);
void Point::setTime(unsigned long long timestamp) {
_timestamp = timeStampToString(timestamp);
}

void Point::setTime(String timestamp) {
Expand Down
10 changes: 6 additions & 4 deletions src/InfluxDbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class Point {
void addField(String name, const char *value);
// Set timestamp to `now()` and store it in specified precision, nanoseconds by default. Date and time must be already set. See `configTime` in the device API
void setTime(WritePrecision writePrecision = WritePrecision::NS);
// Set timestamp in seconds since epoch (1.1.1970). Precision should be set to `S`
void setTime(unsigned long seconds);
// Set timestamp in desired precision (specified in InfluxDBClient) since epoch (1.1.1970 00:00:00)
// Set timestamp in offset since epoch (1.1.1970). Correct precision must be set InfluxDBClient::setWriteOptions.
void setTime(unsigned long long timestamp);
// Set timestamp in offset since epoch (1.1.1970 00:00:00). Correct precision must be set InfluxDBClient::setWriteOptions.
void setTime(String timestamp);
// Clear all fields. Usefull for reusing point
void clearFields();
Expand All @@ -95,10 +95,12 @@ class Point {
bool hasFields() const { return _fields.length() > 0; }
// True if a point contains at least one tag
bool hasTags() const { return _tags.length() > 0; }
// True if a point contains timestamp
// True if a point contains timestamp
bool hasTime() const { return _timestamp.length() > 0; }
// Creates line protocol
String toLineProtocol() const;
// returns current timestamp
String getTime() const { return _timestamp; }
protected:
String _measurement;
String _tags;
Expand Down
27 changes: 27 additions & 0 deletions src/util/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,31 @@ void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2
time_t tnow = time(nullptr);
Serial.print("Synchronized time: ");
Serial.println(ctime(&tnow));
}

unsigned long long getTimeStamp(struct timeval *tv, int secFracDigits) {
unsigned long long tsVal = 0;
switch(secFracDigits) {
case 0:
tsVal = tv->tv_sec;
break;
case 6:
tsVal = tv->tv_sec * 1000000LL + tv->tv_usec;
break;
case 9:
tsVal = tv->tv_sec * 1000000000LL + tv->tv_usec * 1000LL;
break;
case 3:
default:
tsVal = tv->tv_sec * 1000LL + tv->tv_usec / 1000LL;
break;

}
return tsVal;
}

String timeStampToString(unsigned long long timestamp) {
static char buff[50];
snprintf(buff, 50, "%llu", timestamp);
return String(buff);
}
7 changes: 7 additions & 0 deletions src/util/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@
#define _INFLUXDB_CLIENT_HELPERS_H

#include <Arduino.h>
#include <sys/time.h>

// Synchronize time with NTP servers and waits for completition. Prints waiting progress and final synchronized time to the serial.
// Accurate time is necessary for certificate validion and writing points in batch
// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/
void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2 = nullptr, const char* ntpServer3 = nullptr);

// Create timestamp in offset from epoch. secFracDigits specify resulution. 0 - seconds, 3 - milliseconds, etc. Maximum and default is 9 - nanoseconds.
unsigned long long getTimeStamp(struct timeval *tv, int secFracDigits = 3);

// Converts unsigned long long timestamp to String
String timeStampToString(unsigned long long timestamp);

#endif //_INFLUXDB_CLIENT_HELPERS_H
12 changes: 8 additions & 4 deletions test/TestSupport.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#ifndef _TEST_SUPPORT_H_
#define _TEST_SUPPORT_H_

#define TEST_INIT(name) int temp = failures; const char *testName = name; do { Serial.println(testName)
#define TEST_END() } while(0); Serial.printf("%s %s\n",testName,failures == temp?"SUCCEEDED":"FAILED")
#define TEST_ASSERT(a) if(testAssert(__LINE__, (a))) break
#define TEST_ASSERTM(a,m) if(testAssertm(__LINE__, (a),(m))) break
#define TEST_INIT(name) int temp = failures;\
const char *testName = name; \
do { \
Serial.println(testName)
#define TEST_END() } while(0); \
end: Serial.printf("%s %s\n",testName,failures == temp?"SUCCEEDED":"FAILED")
#define TEST_ASSERT(a) if(testAssert(__LINE__, (a))) goto end
#define TEST_ASSERTM(a,m) if(testAssertm(__LINE__, (a),(m))) goto end

#include "query/FluxParser.h"

Expand Down
73 changes: 55 additions & 18 deletions test/test.ino
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ void setup() {
}

void loop() {
//tests
// tests
testPoint();
testFluxTypes();
testFluxParserEmpty();

testFluxParserSingleTable();
testFluxParserNilValue();
testFluxParserMultiTables(false);
Expand All @@ -68,11 +67,11 @@ void loop() {
testUserAgent();
testFailedWrites();
testTimestamp();
// testRetryOnFailedConnection();
// testBufferOverwriteBatchsize1();
// testBufferOverwriteBatchsize5();
// testServerTempDownBatchsize5();
// testRetriesOnServerOverload();
testRetryOnFailedConnection();
testBufferOverwriteBatchsize1();
testBufferOverwriteBatchsize5();
testServerTempDownBatchsize5();
testRetriesOnServerOverload();

Serial.printf("Test %s\n", failures ? "FAILED" : "SUCCEEDED");
while(1) delay(1000);
Expand Down Expand Up @@ -115,15 +114,22 @@ void testPoint() {

TEST_ASSERT(!p.hasTime());
time_t now = time(nullptr);
String snow(now);
p.setTime(now);
String testLineTime = testLine + " " + now;
String testLineTime = testLine + " " + snow;
line = p.toLineProtocol();
TEST_ASSERTM(line == testLineTime, line);

unsigned long long ts = now*1000000000LL+123456789;
p.setTime(ts);
testLineTime = testLine + " " + snow + "123456789";
line = p.toLineProtocol();
TEST_ASSERTM(line == testLineTime, line);

now += 10;
String nowStr(now);
p.setTime(nowStr);
testLineTime = testLine + " " + nowStr;
snow = now;
p.setTime(snow);
testLineTime = testLine + " " + snow;
line = p.toLineProtocol();
TEST_ASSERTM(line == testLineTime, line);

Expand All @@ -132,29 +138,29 @@ void testPoint() {
int partsCount;
String *parts = getParts(line, ' ', partsCount);
TEST_ASSERTM(partsCount == 3, String("3 != ") + partsCount);
TEST_ASSERT(parts[2].length() == nowStr.length());
TEST_ASSERT(parts[2].length() == snow.length());
delete[] parts;

p.setTime(WritePrecision::MS);
TEST_ASSERT(p.hasTime());
line = p.toLineProtocol();
parts = getParts(line, ' ', partsCount);
TEST_ASSERT(partsCount == 3);
TEST_ASSERT(parts[2].length() == nowStr.length() + 3);
TEST_ASSERT(parts[2].length() == snow.length() + 3);
delete[] parts;

p.setTime(WritePrecision::US);
line = p.toLineProtocol();
parts = getParts(line, ' ', partsCount);
TEST_ASSERT(partsCount == 3);
TEST_ASSERT(parts[2].length() == nowStr.length() + 6);
TEST_ASSERT(parts[2].length() == snow.length() + 6);
delete[] parts;

p.setTime(WritePrecision::NS);
line = p.toLineProtocol();
parts = getParts(line, ' ', partsCount);
TEST_ASSERT(partsCount == 3);
TEST_ASSERT(parts[2].length() == nowStr.length() + 9);
TEST_ASSERT(parts[2].length() == snow.length() + 9);
delete[] parts;

p.clearFields();
Expand Down Expand Up @@ -719,6 +725,31 @@ void testFailedWrites() {

void testTimestamp() {
TEST_INIT("testTimestamp");

struct timeval tv;
tv.tv_usec = 1234;
tv.tv_sec = 5678;
unsigned long long ts = getTimeStamp(&tv, 0);
TEST_ASSERTM( ts == 5678, timeStampToString(ts));
ts = getTimeStamp(&tv, 3);
TEST_ASSERTM( ts == 5678001, timeStampToString(ts));
ts = getTimeStamp(&tv, 6);
TEST_ASSERTM( ts == 5678001234, timeStampToString(ts));
ts = getTimeStamp(&tv, 9);
TEST_ASSERTM( ts == 5678001234000, timeStampToString(ts));

// Test increasing timestamp
String prev = "";
for(int i = 0;i<2000;i++) {
Point p("test");
p.setTime(WritePrecision::US);
String act = p.getTime();
TEST_ASSERTM( i == 0 || prev < act, String(i) + ": " + prev + " vs " + act);
prev = act;
delayMicroseconds(100);
}


serverLog(INFLUXDB_CLIENT_TESTING_URL, "testTimestamp");
InfluxDBClient client(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK);
client.setWriteOptions(WritePrecision::S, 1, 5);
Expand Down Expand Up @@ -977,6 +1008,7 @@ bool testFluxDateTimeValue(FluxQueryResult flux, int columnIndex, const char *c
TEST_ASSERTM(dt.microseconds == us, String(dt.microseconds) + " vs " + String(us));
return true;
} while(0);
end:
return false;
}

Expand All @@ -987,6 +1019,7 @@ bool testStringValue(FluxQueryResult flux, int columnIndex, const char *columnN
TEST_ASSERTM(flux.getValueByName(columnName).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue());
return true;
} while(0);
end:
return false;
}

Expand All @@ -997,12 +1030,12 @@ bool testStringVector(std::vector<String> vect, const char *values[], int size)
if(vect[i] != values[i]) {
Serial.print("assert failure: ");
Serial.println(vect[i]);
goto fail;
goto end;
}
}
return true;
} while(0);
fail:
end:
return false;
}

Expand All @@ -1013,6 +1046,7 @@ bool testDoubleValue(FluxQueryResult flux, int columnIndex, const char *columnN
TEST_ASSERTM(flux.getValueByName(columnName).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue());
return true;
} while(0);
end:
return false;
}

Expand All @@ -1023,6 +1057,7 @@ bool testLongValue(FluxQueryResult flux, int columnIndex, const char *columnNam
TEST_ASSERTM(flux.getValueByName(columnName).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue());
return true;
} while(0);
end:
return false;
}

Expand All @@ -1033,6 +1068,7 @@ bool testUnsignedLongValue(FluxQueryResult flux, int columnIndex, const char *c
TEST_ASSERTM(flux.getValueByName(columnName).getRawValue() == rawValue, flux.getValueByName(columnName).getRawValue());
return true;
} while(0);
end:
return false;
}

Expand All @@ -1046,6 +1082,7 @@ bool testTableColumns(FluxQueryResult flux, const char *columns[], int columnsC
TEST_ASSERTM(flux.getColumnIndex("x") == -1, "flux.getColumnIndex(\"x\")");
return true;
} while(0);
end:
return false;
}

Expand Down Expand Up @@ -1460,7 +1497,7 @@ void initInet() {
} else {
Serial.printf("Connected to: %s (%d)\n", WiFi.SSID().c_str(), WiFi.RSSI());

timeSync("CET-1CEST,M3.5.0,M10.5.0/3", "pool.ntp.org", "0.cz.pool.ntp.org", "1.cz.pool.ntp.org");
timeSync("CET-1CEST,M3.5.0,M10.5.0/3", "0.cz.pool.ntp.org", "1.cz.pool.ntp.org", "pool.ntp.org");

deleteAll(INFLUXDB_CLIENT_TESTING_URL);
}
Expand Down

0 comments on commit 722c6a5

Please sign in to comment.