From 26adff1fa560e814e31df598d0f755e25ab2c965 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Feb 2022 19:32:48 +0800 Subject: [PATCH] minor changes --- source/common/src/ttime.c | 131 ++++++------- source/common/src/ttszip.c | 381 ++++++++++++++++++------------------- 2 files changed, 251 insertions(+), 261 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index ba1529ade7..460c4a6fc0 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -43,50 +43,46 @@ * An encoding of midnight at the end of the day as 24:00:00 - ie. midnight * tomorrow - (allowable under ISO 8601) is supported. */ -static int64_t user_mktime64(const unsigned int year0, const unsigned int mon0, - const unsigned int day, const unsigned int hour, - const unsigned int min, const unsigned int sec, int64_t time_zone) -{ - unsigned int mon = mon0, year = year0; +static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const uint32_t day, const uint32_t hour, + const uint32_t min, const uint32_t sec, int64_t time_zone) { + uint32_t mon = mon0, year = year0; /* 1..12 -> 11,12,1..10 */ - if (0 >= (int) (mon -= 2)) { - mon += 12; /* Puts Feb last since it has leap day */ + if (0 >= (int32_t)(mon -= 2)) { + mon += 12; /* Puts Feb last since it has leap day */ year -= 1; } - //int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) + - // year*365 - 719499)*24 + hour)*60 + min)*60 + sec); + // int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) + + // year*365 - 719499)*24 + hour)*60 + min)*60 + sec); int64_t res; - res = 367*((int64_t)mon)/12; - res += year/4 - year/100 + year/400 + day + ((int64_t)year)*365 - 719499; - res = res*24; - res = ((res + hour) * 60 + min) * 60 + sec; + res = 367 * ((int64_t)mon) / 12; + res += year / 4 - year / 100 + year / 400 + day + ((int64_t)year) * 365 - 719499; + res = res * 24; + res = ((res + hour) * 60 + min) * 60 + sec; return (res + time_zone); } // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; -void deltaToUtcInitOnce() { +void deltaToUtcInitOnce() { struct tm tm = {0}; - (void)strptime("1970-01-01 00:00:00", (const char *)("%Y-%m-%d %H:%M:%S"), &tm); + (void)strptime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); m_deltaUtc = (int64_t)mktime(&tm); - //printf("====delta:%lld\n\n", seconds); + // printf("====delta:%lld\n\n", seconds); } static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); -static char* forwardToTimeStringEnd(char* str); -static bool checkTzPresent(const char *str, int32_t len); +static char* forwardToTimeStringEnd(char* str); +static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = { - parseLocaltime, - parseLocaltimeDst -}; +static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime, + parseLocaltimeDst}; int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ @@ -103,9 +99,9 @@ bool checkTzPresent(const char* str, int32_t len) { char* seg = forwardToTimeStringEnd((char*)str); int32_t seg_len = len - (int32_t)(seg - str); - char *c = &seg[seg_len - 1]; - for (int i = 0; i < seg_len; ++i) { - if (*c == 'Z' || *c == 'z' || *c == '+' || *c == '-') { + char* c = &seg[seg_len - 1]; + for (int32_t i = 0; i < seg_len; ++i) { + if (*c == 'Z' || *c == 'z' || *c == '+' || *c == '-') { return true; } c--; @@ -199,13 +195,12 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { i += 2; } - //return error if there're illegal charaters after min(2 Digits) - char *minStr = &str[i]; + // return error if there're illegal charaters after min(2 Digits) + char* minStr = &str[i]; if (minStr[1] != '\0' && minStr[2] != '\0') { - return -1; + return -1; } - int64_t minute = strnatoi(&str[i], 2); if (minute > 59) { return -1; @@ -233,9 +228,8 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { * 2013-04-12T15:52:01.123+0800 */ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) { - - int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : - (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + int64_t factor = + (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); int64_t tzOffset = 0; struct tm tm = {0}; @@ -255,8 +249,8 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch /* mktime will be affected by TZ, set by using taos_options */ #ifdef WINDOWS - int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, 0); - //int64_t seconds = gmtime(&tm); + int64_t seconds = user_mktime64(tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, 0); + // int64_t seconds = gmtime(&tm); #else int64_t seconds = timegm(&tm); #endif @@ -320,8 +314,9 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { #endif #endif - int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, timezone); - + int64_t seconds = + user_mktime64(tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, timezone); + int64_t fraction = 0; if (*str == '.') { @@ -331,8 +326,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { } } - int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : - (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + int64_t factor = + (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); *time = factor * seconds + fraction; return 0; @@ -350,7 +345,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { /* mktime will be affected by TZ, set by using taos_options */ int64_t seconds = mktime(&tm); - + int64_t fraction = 0; if (*str == '.') { @@ -360,27 +355,22 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { } } - int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : - (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + int64_t factor = + (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); *time = factor * seconds + fraction; return 0; } int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { - assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || - fromPrecision == TSDB_TIME_PRECISION_MICRO || + assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || fromPrecision == TSDB_TIME_PRECISION_NANO); - assert(toPrecision == TSDB_TIME_PRECISION_MILLI || - toPrecision == TSDB_TIME_PRECISION_MICRO || + assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO || toPrecision == TSDB_TIME_PRECISION_NANO); - static double factors[3][3] = { {1., 1000., 1000000.}, - {1.0 / 1000, 1., 1000.}, - {1.0 / 1000000, 1.0 / 1000, 1.} }; + static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}}; return (int64_t)((double)time * factors[fromPrecision][toPrecision]); } static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { - switch (unit) { case 's': (*result) = convertTimePrecision(val * MILLISECOND_PER_SECOND, TSDB_TIME_PRECISION_MILLI, timePrecision); @@ -427,7 +417,8 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time * d - Days (24 hours) * w - Weeks (7 days) */ -int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) { +int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, + int32_t timePrecision) { errno = 0; char* endPtr = NULL; @@ -474,9 +465,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { } struct tm tm; - time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); + time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); localtime_r(&tt, &tm); - int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration; + int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)duration; tm.tm_year = mon / 12; tm.tm_mon = mon % 12; @@ -497,13 +488,13 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); struct tm tm; - time_t t = (time_t)skey; + time_t t = (time_t)skey; localtime_r(&t, &tm); - int smon = tm.tm_year * 12 + tm.tm_mon; + int32_t smon = tm.tm_year * 12 + tm.tm_mon; t = (time_t)ekey; localtime_r(&t, &tm); - int emon = tm.tm_year * 12 + tm.tm_mon; + int32_t emon = tm.tm_year * 12 + tm.tm_mon; if (unit == 'y') { interval *= 12; @@ -522,7 +513,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') { start /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); struct tm tm; - time_t tt = (time_t)start; + time_t tt = (time_t)start; localtime_r(&tt, &tm); tm.tm_sec = 0; tm.tm_min = 0; @@ -531,10 +522,10 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio if (pInterval->slidingUnit == 'y') { tm.tm_mon = 0; - tm.tm_year = (int)(tm.tm_year / pInterval->sliding * pInterval->sliding); + tm.tm_year = (int32_t)(tm.tm_year / pInterval->sliding * pInterval->sliding); } else { - int mon = tm.tm_year * 12 + tm.tm_mon; - mon = (int)(mon / pInterval->sliding * pInterval->sliding); + int32_t mon = tm.tm_year * 12 + tm.tm_mon; + mon = (int32_t)(mon / pInterval->sliding * pInterval->sliding); tm.tm_year = mon / 12; tm.tm_mon = mon % 12; } @@ -547,17 +538,17 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio start = (delta / pInterval->sliding + factor) * pInterval->sliding; if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') { - /* - * here we revised the start time of day according to the local time zone, - * but in case of DST, the start time of one day need to be dynamically decided. - */ + /* + * here we revised the start time of day according to the local time zone, + * but in case of DST, the start time of one day need to be dynamically decided. + */ // todo refactor to extract function that is available for Linux/Windows/Mac platform - #if defined(WINDOWS) && _MSC_VER >= 1900 +#if defined(WINDOWS) && _MSC_VER >= 1900 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 int64_t timezone = _timezone; int32_t daylight = _daylight; char** tzname = _tzname; - #endif +#endif start += (int64_t)(timezone * TSDB_TICK_PER_SECOND(precision)); } @@ -568,7 +559,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio if (start < 0 || INT64_MAX - start > pInterval->interval - 1) { end = start + pInterval->interval - 1; - while(end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window + while (end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window start += pInterval->sliding; if (start < 0 || INT64_MAX - start > pInterval->interval - 1) { @@ -601,8 +592,8 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio // and the parameter can also be a variable. const char* fmtts(int64_t ts) { static char buf[96]; - size_t pos = 0; - struct tm tm; + size_t pos = 0; + struct tm tm; if (ts > -62135625943 && ts < 32503651200) { time_t t = (time_t)ts; @@ -619,7 +610,7 @@ const char* fmtts(int64_t ts) { buf[pos++] = ' '; } pos += strftime(buf + pos, sizeof(buf), "ms=%Y-%m-%d %H:%M:%S", &tm); - pos += sprintf(buf + pos, ".%03d", (int)(ts % 1000)); + pos += sprintf(buf + pos, ".%03d", (int32_t)(ts % 1000)); } { @@ -631,7 +622,7 @@ const char* fmtts(int64_t ts) { buf[pos++] = ' '; } pos += strftime(buf + pos, sizeof(buf), "us=%Y-%m-%d %H:%M:%S", &tm); - pos += sprintf(buf + pos, ".%06d", (int)(ts % 1000000)); + pos += sprintf(buf + pos, ".%06d", (int32_t)(ts % 1000000)); } return buf; diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index a05a5c6ae1..a34895d264 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -19,7 +19,7 @@ #include "tcompression.h" static int32_t getDataStartOffset(); -static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo); +static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo); static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); @@ -36,7 +36,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { } pTSBuf->autoDelete = autoDelete; - + taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path); // pTSBuf->pFile = fopen(pTSBuf->path, "wb+"); pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); @@ -48,20 +48,20 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { if (!autoDelete) { remove(pTSBuf->path); } - + if (NULL == allocResForTSBuf(pTSBuf)) { return NULL; } - + // update the header info STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC}; STSBufUpdateHeader(pTSBuf, &header); - + tsBufResetPos(pTSBuf); pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->tsOrder = order; - + return pTSBuf; } @@ -72,23 +72,23 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { } pTSBuf->autoDelete = autoDelete; - + tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path)); - + // pTSBuf->pFile = fopen(pTSBuf->path, "rb+"); pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ); if (pTSBuf->pFile == NULL) { free(pTSBuf); return NULL; } - + if (allocResForTSBuf(pTSBuf) == NULL) { return NULL; } - + // validate the file magic number STSBufFileHeader header = {0}; - int32_t ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); + int32_t ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); UNUSED(ret); size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader)); UNUSED(sz); @@ -98,7 +98,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { tsBufDestroy(pTSBuf); return NULL; } - + if (header.numOfGroup > pTSBuf->numOfAlloc) { pTSBuf->numOfAlloc = header.numOfGroup; STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc); @@ -106,57 +106,58 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->pData = tmp; } - + pTSBuf->numOfGroups = header.numOfGroup; - + // check the ts order pTSBuf->tsOrder = header.tsOrder; if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { -// tscError("invalid order info in buf:%d", pTSBuf->tsOrder); + // tscError("invalid order info in buf:%d", pTSBuf->tsOrder); tsBufDestroy(pTSBuf); return NULL; } - + size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups; - + STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize); if (buf == NULL) { tsBufDestroy(pTSBuf); - return NULL; - } - - //int64_t pos = ftell(pTSBuf->pFile); //pos not used + return NULL; + } + + // int64_t pos = ftell(pTSBuf->pFile); //pos not used sz = taosReadFile(pTSBuf->pFile, buf, infoSize); UNUSED(sz); - + // the length value for each vnode is not kept in file, so does not set the length value for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) { STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i]; memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo)); } free(buf); - + ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END); UNUSED(ret); - + int64_t file_size; if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->fileSize = (uint32_t)file_size; tsBufResetPos(pTSBuf); - + // ascending by default pTSBuf->cur.order = TSDB_ORDER_ASC; - -// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->pFile), -// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete); - + + // tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, + // fileno(pTSBuf->pFile), + // pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete); + return pTSBuf; } @@ -164,22 +165,22 @@ void* tsBufDestroy(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return NULL; } - + tfree(pTSBuf->assistBuf); tfree(pTSBuf->tsData.rawBuf); - + tfree(pTSBuf->pData); tfree(pTSBuf->block.payload); if (!pTSBuf->remainOpen) { taosCloseFile(&pTSBuf->pFile); } - + if (pTSBuf->autoDelete) { -// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); + // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); remove(pTSBuf->path); } else { -// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); + // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } taosVariantDestroy(&pTSBuf->block.tag); @@ -189,7 +190,7 @@ void* tsBufDestroy(STSBuf* pTSBuf) { static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) { int32_t last = pTSBuf->numOfGroups - 1; - + assert(last >= 0); return &pTSBuf->pData[last]; } @@ -198,40 +199,40 @@ static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) { if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) { uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); assert((int32_t)newSize > pTSBuf->numOfAlloc); - + STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); if (tmp == NULL) { return NULL; } - + pTSBuf->pData = tmp; pTSBuf->numOfAlloc = newSize; memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups)); } - + if (pTSBuf->numOfGroups > 0) { STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - + // update prev vnode length info in file TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info); } - + // set initial value for vnode block STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info; pBlockInfo->id = id; pBlockInfo->offset = pTSBuf->fileSize; assert(pBlockInfo->offset >= getDataStartOffset()); - + // update vnode info in file TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo); - + // add one vnode info pTSBuf->numOfGroups += 1; - + // update the header info STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; - + STSBufUpdateHeader(pTSBuf, &header); return tsBufGetLastGroupInfo(pTSBuf); } @@ -240,7 +241,7 @@ static void shrinkBuffer(STSList* ptsData) { // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size if (ptsData->allocSize >= ptsData->threshold * 2) { char* rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE); - if(rawBuf) { + if (rawBuf) { ptsData->rawBuf = rawBuf; ptsData->allocSize = MEM_BUF_SIZE; } @@ -260,18 +261,17 @@ static void writeDataToDisk(STSBuf* pTSBuf) { if (pTSBuf->tsData.len == 0) { return; } - + STSBlock* pBlock = &pTSBuf->block; STSList* pTsData = &pTSBuf->tsData; pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE; - pBlock->compLen = - tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len/TSDB_KEYSIZE, pBlock->payload, pTsData->allocSize, - TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - + pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload, + pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); + int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET); assert(r == 0); - + /* * format for output data: * 1. tags, number of ts, size after compressed, payload, size after compressed @@ -289,10 +289,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) { } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); float tfloat = (float)pBlock->tag.d; - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t) pBlock->tag.nLen); + metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen); } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t) pBlock->tag.nLen); + metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen); } else { trueLen = 0; metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); @@ -303,19 +303,19 @@ static void writeDataToDisk(STSBuf* pTSBuf) { taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen); taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen)); - metaLen += (int32_t) taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); + metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); assert(metaLen == getTagAreaLength(&pBlock->tag)); int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; pTSBuf->fileSize += blockSize; - + pTSBuf->tsData.len = 0; - + STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - + pGroupBlockInfoEx->info.compLen += blockSize; pGroupBlockInfoEx->info.numOfBlocks += 1; - + shrinkBuffer(&pTSBuf->tsData); } @@ -326,7 +326,7 @@ static void expandBuffer(STSList* ptsData, int32_t inputSize) { if (tmp == NULL) { // todo } - + ptsData->rawBuf = tmp; ptsData->allocSize = newSize; } @@ -336,8 +336,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { STSBlock* pBlock = &pTSBuf->block; // clear the memory buffer - pBlock->compLen = 0; - pBlock->padding = 0; + pBlock->compLen = 0; + pBlock->padding = 0; pBlock->numOfElem = 0; int32_t offset = -1; @@ -347,11 +347,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { * set the right position for the reversed traverse, the reversed traverse is started from * the end of each comp data block */ - int32_t prev = -(int32_t) (sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen)); + int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen)); int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR); - size_t sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); + size_t sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - UNUSED(sz); + UNUSED(sz); pBlock->compLen = pBlock->padding; @@ -376,11 +376,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { UNUSED(sz); } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { float tfloat = 0; - sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t) pBlock->tag.nLen); + sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen); pBlock->tag.d = (double)tfloat; UNUSED(sz); - } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value - sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t) pBlock->tag.nLen); + } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { // TODO check the return value + sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen); UNUSED(sz); } @@ -395,7 +395,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); } - + // read the comp length at the length of comp block sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); assert(pBlock->padding == pBlock->compLen); @@ -409,24 +409,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { } UNUSED(sz); - + // for backwards traverse, set the start position at the end of previous block if (order == TSDB_ORDER_DESC) { int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR); UNUSED(r); } - + return pBlock; } // set the order of ts buffer if the ts order has not been set yet static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { STSList* ptsData = &pTSBuf->tsData; - + if (pTSBuf->tsOrder == -1) { if (ptsData->len > 0) { TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); - + if (lastKey > *(TSKEY*)pData) { pTSBuf->tsOrder = TSDB_ORDER_DESC; } else { @@ -436,7 +436,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { // no data in current vnode, more than one ts is added, check the orders TSKEY k1 = *(TSKEY*)(pData); TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); - + if (k1 < k2) { pTSBuf->tsOrder = TSDB_ORDER_ASC; } else if (k1 > k2) { @@ -448,23 +448,23 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { } else { // todo the timestamp order is set, check the asc/desc order of appended data } - + return TSDB_CODE_SUCCESS; } void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) { STSGroupBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; - + if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); - + pBlockInfo = addOneGroupInfo(pTSBuf, id); } else { pBlockInfo = tsBufGetLastGroupInfo(pTSBuf); } - + assert(pBlockInfo->info.id == id); if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { @@ -476,22 +476,22 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, i taosVariantAssign(&pTSBuf->block.tag, tag); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); - + // todo check return value setCheckTSOrder(pTSBuf, pData, len); - + ptsData->len += len; pBlockInfo->len += len; - + pTSBuf->numOfTotal += len / TSDB_KEYSIZE; - + // the size of raw data exceeds the size of the default prepared buffer, so // during getBufBlock, the output buffer needs to be large enough. if (ptsData->len >= ptsData->threshold) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); } - + tsBufResetPos(pTSBuf); } @@ -499,15 +499,15 @@ void tsBufFlush(STSBuf* pTSBuf) { if (pTSBuf->tsData.len <= 0) { return; } - + writeDataToDisk(pTSBuf); shrinkBuffer(&pTSBuf->tsData); - + STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - + // update prev vnode length info in file TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info); - + // save the ts order into header STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; @@ -522,7 +522,7 @@ static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t num break; } } - + return j; } @@ -531,17 +531,17 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { return -1; } - + // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode int32_t i = 0; bool decomp = false; - + while ((i++) <= blockIndex) { if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) { return -1; } } - + // set the file position to be the end of previous comp block if (pTSBuf->cur.order == TSDB_ORDER_DESC) { STSBlock* pBlock = &pTSBuf->block; @@ -550,34 +550,34 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR); UNUSED(ret); } - + return 0; } static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) { bool decomp = false; - + int64_t offset = 0; if (pTSBuf->cur.order == TSDB_ORDER_ASC) { offset = pBlockInfo->offset; } else { // reversed traverse starts from the end of block offset = pBlockInfo->offset + pBlockInfo->compLen; } - + if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) { return -1; } - + for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { return -1; } - + if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) { - return (pTSBuf->cur.order == TSDB_ORDER_ASC)? i: (pBlockInfo->numOfBlocks - (i + 1)); + return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1)); } } - + return -1; } @@ -586,14 +586,14 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex if (pBlockInfo->numOfBlocks <= blockIndex) { assert(false); } - + STSCursor* pCur = &pTSBuf->cur; if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || - (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { + (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { int32_t i = 0; bool decomp = false; int32_t step = abs(blockIndex - pCur->blockIndex); - + while ((++i) <= step) { if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { return; @@ -604,11 +604,11 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex assert(false); } } - + STSBlock* pBlock = &pTSBuf->block; - + size_t s = pBlock->numOfElem * TSDB_KEYSIZE; - + /* * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function @@ -616,16 +616,16 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex if (s > pTSBuf->tsData.allocSize) { expandBuffer(&pTSBuf->tsData, (int32_t)s); } - + pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - + assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); - + pCur->vgroupIndex = groupIndex; pCur->blockIndex = blockIndex; - + pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; } @@ -647,7 +647,7 @@ STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) { if (j == -1) { return NULL; } - + return &pTSBuf->pData[j].info; } @@ -660,13 +660,14 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); if (r != 0) { -// qError("fseek failed, errno:%d", errno); + // qError("fseek failed, errno:%d", errno); return -1; } size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader)); - if (ws != 1) { -// qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, (int32_t)sizeof(STSBufFileHeader)); + if (ws != 1) { + // qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, + // (int32_t)sizeof(STSBufFileHeader)); return -1; } return 0; @@ -676,33 +677,33 @@ bool tsBufNextPos(STSBuf* pTSBuf) { if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) { return false; } - + STSCursor* pCur = &pTSBuf->cur; - + // get the first/last position according to traverse order if (pCur->vgroupIndex == -1) { if (pCur->order == TSDB_ORDER_ASC) { tsBufGetBlock(pTSBuf, 0, 0); - + if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return tsBufResetPos(pTSBuf); return false; } else { return true; } - + } else { // get the last timestamp record in the last block of the last vnode assert(pTSBuf->numOfGroups > 0); - + int32_t groupIndex = pTSBuf->numOfGroups - 1; pCur->vgroupIndex = groupIndex; - + int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; - + tsBufGetBlock(pTSBuf, groupIndex, blockIndex); - + pCur->tsIndex = pTSBuf->block.numOfElem - 1; if (pTSBuf->block.numOfElem == 0) { tsBufResetPos(pTSBuf); @@ -712,16 +713,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) { } } } - + int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1; - + while (1) { assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); - + if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; - + STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { @@ -730,15 +731,15 @@ bool tsBufNextPos(STSBuf* pTSBuf) { pCur->vgroupIndex = -1; return false; } - + if (pBlockInfo == NULL) { return false; } - + int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1); tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); break; - + } else { tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step); break; @@ -748,7 +749,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) { break; } } - + return true; } @@ -756,7 +757,7 @@ void tsBufResetPos(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return; } - + pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order}; } @@ -765,14 +766,14 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return elem1; } - + STSCursor* pCur = &pTSBuf->cur; if (pCur != NULL && pCur->vgroupIndex < 0) { return elem1; } STSBlock* pBlock = &pTSBuf->block; - + elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.tag = &pBlock->tag; @@ -791,65 +792,65 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) { return 0; } - + if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) { return -1; } - + // src can only have one vnode index assert(pSrcBuf->numOfGroups == 1); // there are data in buffer, flush to disk first tsBufFlush(pDestBuf); - + // compared with the last vnode id - int32_t id = tsBufGetLastGroupInfo((STSBuf*) pSrcBuf)->info.id; + int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id; if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) { int32_t oldSize = pDestBuf->numOfGroups; int32_t newSize = oldSize + pSrcBuf->numOfGroups; - + if (pDestBuf->numOfAlloc < newSize) { pDestBuf->numOfAlloc = newSize; - + STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); if (tmp == NULL) { return -1; } - + pDestBuf->pData = tmp; } - + // directly copy the vnode index information memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx)); - + // set the new offset value for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) { STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; pBlockInfoEx->info.id = id; } - + pDestBuf->numOfGroups = newSize; } else { STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf); - + pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.id = id; } - + int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END); assert(r == 0); - + int64_t offset = getDataStartOffset(); int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset; int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size); - + if (written == -1 || written != size) { return -1; } - + pDestBuf->numOfTotal += pSrcBuf->numOfTotal; int32_t oldSize = pDestBuf->fileSize; @@ -864,7 +865,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { int64_t file_size; if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) { - return -1; + return -1; } pDestBuf->fileSize = (uint32_t)file_size; @@ -875,33 +876,33 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) { STSBuf* pTSBuf = tsBufCreate(true, order); - + STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info); pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->compLen = len; pBlockInfo->offset = getDataStartOffset(); pBlockInfo->id = id; - + // update prev vnode length info in file TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo); - + int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET); if (ret == -1) { -// qError("fseek failed, errno:%d", errno); + // qError("fseek failed, errno:%d", errno); tsBufDestroy(pTSBuf); return NULL; } size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len); if (sz != len) { -// qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len); + // qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len); tsBufDestroy(pTSBuf); return NULL; } pTSBuf->fileSize += len; - + pTSBuf->tsOrder = order; assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - + STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; if (STSBufUpdateHeader(pTSBuf, &header) < 0) { @@ -910,42 +911,42 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ } // TODO taosFsync?? -// if (taosFsync(fileno(pTSBuf->pFile)) == -1) { -//// qError("fsync failed, errno:%d", errno); -// tsBufDestroy(pTSBuf); -// return NULL; -// } - + // if (taosFsync(fileno(pTSBuf->pFile)) == -1) { + //// qError("fsync failed, errno:%d", errno); + // tsBufDestroy(pTSBuf); + // return NULL; + // } + return pTSBuf; } STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) { STSElem elem = {.id = -1}; - + if (pTSBuf == NULL) { return elem; } - + int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id); if (j == -1) { return elem; } - + // for debug purpose // tsBufDisplay(pTSBuf); - + STSCursor* pCur = &pTSBuf->cur; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; - + int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); if (blockIndex < 0) { return elem; } - + pCur->vgroupIndex = j; pCur->blockIndex = blockIndex; tsBufGetBlock(pTSBuf, j, blockIndex); - + return tsBufGetElem(pTSBuf); } @@ -954,7 +955,7 @@ STSCursor tsBufGetCursor(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return c; } - + return pTSBuf->cur; } @@ -962,12 +963,12 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { if (pTSBuf == NULL || pCur == NULL) { return; } - + // assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); if (pCur->vgroupIndex != -1) { tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex); } - + pTSBuf->cur = *pCur; } @@ -975,7 +976,7 @@ void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { if (pTSBuf == NULL) { return; } - + pTSBuf->cur.order = order; } @@ -992,19 +993,19 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) { void tsBufDisplay(STSBuf* pTSBuf) { printf("-------start of ts comp file-------\n"); printf("number of vnode:%d\n", pTSBuf->numOfGroups); - + int32_t old = pTSBuf->cur.order; pTSBuf->cur.order = TSDB_ORDER_ASC; - + tsBufResetPos(pTSBuf); - + while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) { printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts); } } - + pTSBuf->cur.order = old; printf("-------end of ts comp file-------\n"); } @@ -1021,36 +1022,36 @@ static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInf static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { const int32_t INITIAL_GROUPINFO_SIZE = 4; - + pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE; pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx)); if (pTSBuf->pData == NULL) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->tsData.rawBuf == NULL) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->bufSize = MEM_BUF_SIZE; pTSBuf->tsData.threshold = MEM_BUF_SIZE; pTSBuf->tsData.allocSize = MEM_BUF_SIZE; - + pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); if (pTSBuf->assistBuf == NULL) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->block.payload = malloc(MEM_BUF_SIZE); if (pTSBuf->block.payload == NULL) { tsBufDestroy(pTSBuf); return NULL; } - + pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; } @@ -1076,28 +1077,28 @@ void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) { (*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t)); - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { (*id)[i] = pTSBuf->pData[i].info.id; } } int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) { assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups); - STSGroupBlockInfo *pBlockInfo = &pTSBuf->pData[groupIndex].info; + STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info; *len = 0; *numOfBlocks = 0; if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { - int code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); -// qError("%p: fseek failed: %s", pSql, tstrerror(code)); + int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); + // qError("%p: fseek failed: %s", pSql, tstrerror(code)); return code; } size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen); if (s != pBlockInfo->compLen) { - int code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); -// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); + int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); + // tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); return code; } @@ -1120,6 +1121,4 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) { return el; } -bool tsBufIsValidElem(STSElem* pElem) { - return pElem->id >= 0; -} +bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }