diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 81ae465cd2..9cbdbfbd25 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1846,3 +1846,7 @@ int taos_set_conn_mode(TAOS* taos, int mode, int value) { } return 0; } + +char* getBuildInfo(){ + return buildinfo; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 597d6035d6..15c5272f3c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -141,6 +141,7 @@ int32_t valueToString(void* k, char* buf); int32_t valueIsStale(void* k, int64_t ts); void destroyCompare(void* arg); +static void cleanDir(const char* pPath, const char* id); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, @@ -212,12 +213,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { char* chkp = taosMemoryCalloc(1, strlen(path) + 64); sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { - if (taosIsDir(state)) { - // remove dir if exists - // taosRenameFile(const char *oldName, const char *newName) - taosRemoveDir(state); - } - taosMkDir(state); + cleanDir(state, ""); code = backendCopyFiles(chkp, state); stInfo("copy snap file from %s to %s", chkp, state); if (code != 0) { @@ -322,6 +318,22 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } +void cleanDir(const char* pPath, const char* id) { + ASSERT(pPath != NULL); + + if (taosIsDir(pPath)) { + taosRemoveDir(pPath); + taosMkDir(pPath); + stInfo("%s clear dir:%s, succ", id, pPath); + } +} + +void validateDir(const char* pPath) { + if (!taosIsDir(pPath)) { + taosMulMkDir(pPath); + } +} + int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { int32_t code = 0; if (taosIsDir(chkptPath)) { @@ -329,11 +341,8 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t ch stDebug("remove local checkpoint data dir:%s succ", chkptPath); } - if (taosIsDir(defaultPath)) { - taosRemoveDir(defaultPath); - taosMulMkDir(defaultPath); - stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); - } + cleanDir(defaultPath, key); + stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); code = streamTaskDownloadCheckpointData(key, chkptPath); if (code != 0) { @@ -484,21 +493,14 @@ int32_t backendCopyFiles(const char* src, const char* dst) { static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, const char* defaultPath) { int32_t code = 0; - - if (taosIsDir(defaultPath)) { - taosRemoveDir(defaultPath); - taosMkDir(defaultPath); - stInfo("%s clear local backend dir:%s, succ", pTaskIdStr, defaultPath); - } + cleanDir(defaultPath, pTaskIdStr); if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId); code = backendCopyFiles(checkpointPath, defaultPath); if (code != TSDB_CODE_SUCCESS) { - taosRemoveDir(defaultPath); - taosMkDir(defaultPath); - + cleanDir(defaultPath, pTaskIdStr); stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); code = TSDB_CODE_SUCCESS; @@ -520,26 +522,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); - if (!taosIsDir(prefixPath)) { - code = taosMkDir(prefixPath); - ASSERT(code == 0); - } + validateDir(prefixPath); char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); - if (!taosIsDir(defaultPath)) { - taosMulMkDir(defaultPath); - } - + validateDir(defaultPath); int32_t pathLen = strlen(path) + 256; char* checkpointRoot = taosMemoryCalloc(1, pathLen); sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); - if (!taosIsDir(checkpointRoot)) { - taosMulMkDir(checkpointRoot); - } + validateDir(checkpointRoot); taosMemoryFree(checkpointRoot); stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); @@ -559,7 +553,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId code = 0; // reset the error code } } else { // no valid checkpoint id - stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key); + stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath); + cleanDir(defaultPath, key); } taosMemoryFree(chkptPath); @@ -2055,7 +2050,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { - stInfo("%s newly create db, need to restart", key); + stInfo("%s newly create db in state-backend", key); // pre create db pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); if (pTaskDb->db == NULL) goto _EXIT; @@ -2215,13 +2210,12 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { - taosRemoveDir(temp); - taosMkDir(temp); + cleanDir(temp, NULL); } else { taosMkDir(temp); } - code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp); + code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp); *path = temp; return code; diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 046cba8686..470e43c957 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -351,18 +351,18 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __m128i data1; - if (nbytes1 == 0) { - data1 = _mm_setzero_si128(); - } else { - memcpy(&data1, (const void*) (input + ipos), nbytes1); + __m128i data1 = _mm_setzero_si128(); + if (nbytes1 > 0) { + int64_t tmp = 0; + memcpy(&tmp, (const void*) (input + ipos), nbytes1); + data1 = _mm_set1_epi64x(tmp); } - __m128i data2; - if (nbytes2 == 0) { - data2 = _mm_setzero_si128(); - } else { - memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2); + __m128i data2 = _mm_setzero_si128(); + if (nbytes2 > 0) { + int64_t tmp = 0; + memcpy(&tmp, (const void*) (input + ipos + nbytes1), nbytes2); + data2 = _mm_set1_epi64x(tmp); } data2 = _mm_broadcastq_epi64(data2); @@ -399,19 +399,15 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __m128i data1; - if (nbytes1 == 0) { - data1 = _mm_setzero_si128(); - } else { + __m128i data1 = _mm_setzero_si128(); + if (nbytes1 > 0) { int64_t dd = 0; memcpy(&dd, (const void*) (input + ipos), nbytes1); data1 = _mm_loadu_si64(&dd); } - __m128i data2; - if (nbytes2 == 0) { - data2 = _mm_setzero_si128(); - } else { + __m128i data2 = _mm_setzero_si128(); + if (nbytes2 > 0) { int64_t dd = 0; memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2); data2 = _mm_loadu_si64(&dd); diff --git a/source/util/src/version.c.in b/source/util/src/version.c.in index 07bb1d42f8..c91b46e18d 100644 --- a/source/util/src/version.c.in +++ b/source/util/src/version.c.in @@ -4,7 +4,4 @@ char gitinfo[48] = "${TD_VER_GIT}"; char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}"; char buildinfo[64] = "${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}"; -void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; -char* getBuildInfo(){ - return buildinfo; -} +void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; \ No newline at end of file diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp index b3cf46fea6..dfcd682255 100644 --- a/source/util/test/decompressTest.cpp +++ b/source/util/test/decompressTest.cpp @@ -38,13 +38,13 @@ TEST(utilTest, decompress_ts_test) { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - int64_t tsList[4] = {1286, 1124, 2681, 2823}; + tsList[0] = 1286; tsList[1] = 1124; tsList[2]=2681; tsList[3] = 2823; - char* pOutput[4 * sizeof(int64_t)] = {0}; - int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4, +// char* pOutput[4 * sizeof(int64_t)] = {0}; + len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4, ONE_STAGE_COMP, NULL, 0); - char* decompOutput[4 * 8] = {0}; + decompOutput[4 * 8] = {0}; tsDecompressTimestamp(pOutput, len, 4, decompOutput, sizeof(int64_t) * 4, ONE_STAGE_COMP, NULL, 0); for (int32_t i = 0; i < 4; ++i) {