Merge branch '3.0' into fix/TD-30677-3.0
This commit is contained in:
commit
af2e6c6411
|
@ -1846,3 +1846,7 @@ int taos_set_conn_mode(TAOS* taos, int mode, int value) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
char* getBuildInfo(){
|
||||
return buildinfo;
|
||||
}
|
|
@ -141,6 +141,7 @@ int32_t valueToString(void* k, char* buf);
|
|||
int32_t valueIsStale(void* k, int64_t ts);
|
||||
|
||||
void destroyCompare(void* arg);
|
||||
static void cleanDir(const char* pPath, const char* id);
|
||||
|
||||
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
||||
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
||||
|
@ -212,12 +213,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
|||
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
|
||||
sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
|
||||
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
|
||||
if (taosIsDir(state)) {
|
||||
// remove dir if exists
|
||||
// taosRenameFile(const char *oldName, const char *newName)
|
||||
taosRemoveDir(state);
|
||||
}
|
||||
taosMkDir(state);
|
||||
cleanDir(state, "");
|
||||
code = backendCopyFiles(chkp, state);
|
||||
stInfo("copy snap file from %s to %s", chkp, state);
|
||||
if (code != 0) {
|
||||
|
@ -322,6 +318,22 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
|
|||
return complete == 1 ? 0 : -1;
|
||||
}
|
||||
|
||||
void cleanDir(const char* pPath, const char* id) {
|
||||
ASSERT(pPath != NULL);
|
||||
|
||||
if (taosIsDir(pPath)) {
|
||||
taosRemoveDir(pPath);
|
||||
taosMkDir(pPath);
|
||||
stInfo("%s clear dir:%s, succ", id, pPath);
|
||||
}
|
||||
}
|
||||
|
||||
void validateDir(const char* pPath) {
|
||||
if (!taosIsDir(pPath)) {
|
||||
taosMulMkDir(pPath);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
|
||||
int32_t code = 0;
|
||||
if (taosIsDir(chkptPath)) {
|
||||
|
@ -329,11 +341,8 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t ch
|
|||
stDebug("remove local checkpoint data dir:%s succ", chkptPath);
|
||||
}
|
||||
|
||||
if (taosIsDir(defaultPath)) {
|
||||
taosRemoveDir(defaultPath);
|
||||
taosMulMkDir(defaultPath);
|
||||
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
|
||||
}
|
||||
cleanDir(defaultPath, key);
|
||||
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
|
||||
|
||||
code = streamTaskDownloadCheckpointData(key, chkptPath);
|
||||
if (code != 0) {
|
||||
|
@ -484,21 +493,14 @@ int32_t backendCopyFiles(const char* src, const char* dst) {
|
|||
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
|
||||
const char* defaultPath) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (taosIsDir(defaultPath)) {
|
||||
taosRemoveDir(defaultPath);
|
||||
taosMkDir(defaultPath);
|
||||
stInfo("%s clear local backend dir:%s, succ", pTaskIdStr, defaultPath);
|
||||
}
|
||||
cleanDir(defaultPath, pTaskIdStr);
|
||||
|
||||
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
|
||||
stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
|
||||
|
||||
code = backendCopyFiles(checkpointPath, defaultPath);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosRemoveDir(defaultPath);
|
||||
taosMkDir(defaultPath);
|
||||
|
||||
cleanDir(defaultPath, pTaskIdStr);
|
||||
stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
|
||||
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
@ -520,26 +522,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
|
|||
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
|
||||
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
|
||||
|
||||
if (!taosIsDir(prefixPath)) {
|
||||
code = taosMkDir(prefixPath);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
validateDir(prefixPath);
|
||||
|
||||
char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256);
|
||||
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state");
|
||||
|
||||
if (!taosIsDir(defaultPath)) {
|
||||
taosMulMkDir(defaultPath);
|
||||
}
|
||||
|
||||
validateDir(defaultPath);
|
||||
int32_t pathLen = strlen(path) + 256;
|
||||
|
||||
char* checkpointRoot = taosMemoryCalloc(1, pathLen);
|
||||
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
|
||||
|
||||
if (!taosIsDir(checkpointRoot)) {
|
||||
taosMulMkDir(checkpointRoot);
|
||||
}
|
||||
validateDir(checkpointRoot);
|
||||
taosMemoryFree(checkpointRoot);
|
||||
|
||||
stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
|
||||
|
@ -559,7 +553,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
|
|||
code = 0; // reset the error code
|
||||
}
|
||||
} else { // no valid checkpoint id
|
||||
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key);
|
||||
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath);
|
||||
cleanDir(defaultPath, key);
|
||||
}
|
||||
|
||||
taosMemoryFree(chkptPath);
|
||||
|
@ -2055,7 +2050,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
|
|||
|
||||
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
|
||||
if (nCf == 0) {
|
||||
stInfo("%s newly create db, need to restart", key);
|
||||
stInfo("%s newly create db in state-backend", key);
|
||||
// pre create db
|
||||
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
|
||||
if (pTaskDb->db == NULL) goto _EXIT;
|
||||
|
@ -2215,13 +2210,12 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
|||
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
|
||||
|
||||
if (taosDirExist(temp)) {
|
||||
taosRemoveDir(temp);
|
||||
taosMkDir(temp);
|
||||
cleanDir(temp, NULL);
|
||||
} else {
|
||||
taosMkDir(temp);
|
||||
}
|
||||
code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
|
||||
|
||||
code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
|
||||
*path = temp;
|
||||
|
||||
return code;
|
||||
|
|
|
@ -351,18 +351,18 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen
|
|||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
__m128i data1;
|
||||
if (nbytes1 == 0) {
|
||||
data1 = _mm_setzero_si128();
|
||||
} else {
|
||||
memcpy(&data1, (const void*) (input + ipos), nbytes1);
|
||||
__m128i data1 = _mm_setzero_si128();
|
||||
if (nbytes1 > 0) {
|
||||
int64_t tmp = 0;
|
||||
memcpy(&tmp, (const void*) (input + ipos), nbytes1);
|
||||
data1 = _mm_set1_epi64x(tmp);
|
||||
}
|
||||
|
||||
__m128i data2;
|
||||
if (nbytes2 == 0) {
|
||||
data2 = _mm_setzero_si128();
|
||||
} else {
|
||||
memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2);
|
||||
__m128i data2 = _mm_setzero_si128();
|
||||
if (nbytes2 > 0) {
|
||||
int64_t tmp = 0;
|
||||
memcpy(&tmp, (const void*) (input + ipos + nbytes1), nbytes2);
|
||||
data2 = _mm_set1_epi64x(tmp);
|
||||
}
|
||||
|
||||
data2 = _mm_broadcastq_epi64(data2);
|
||||
|
@ -399,19 +399,15 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen
|
|||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
__m128i data1;
|
||||
if (nbytes1 == 0) {
|
||||
data1 = _mm_setzero_si128();
|
||||
} else {
|
||||
__m128i data1 = _mm_setzero_si128();
|
||||
if (nbytes1 > 0) {
|
||||
int64_t dd = 0;
|
||||
memcpy(&dd, (const void*) (input + ipos), nbytes1);
|
||||
data1 = _mm_loadu_si64(&dd);
|
||||
}
|
||||
|
||||
__m128i data2;
|
||||
if (nbytes2 == 0) {
|
||||
data2 = _mm_setzero_si128();
|
||||
} else {
|
||||
__m128i data2 = _mm_setzero_si128();
|
||||
if (nbytes2 > 0) {
|
||||
int64_t dd = 0;
|
||||
memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2);
|
||||
data2 = _mm_loadu_si64(&dd);
|
||||
|
|
|
@ -4,7 +4,4 @@ char gitinfo[48] = "${TD_VER_GIT}";
|
|||
char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}";
|
||||
char buildinfo[64] = "${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}";
|
||||
|
||||
void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};
|
||||
char* getBuildInfo(){
|
||||
return buildinfo;
|
||||
}
|
||||
void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};
|
|
@ -38,13 +38,13 @@ TEST(utilTest, decompress_ts_test) {
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int64_t tsList[4] = {1286, 1124, 2681, 2823};
|
||||
tsList[0] = 1286; tsList[1] = 1124; tsList[2]=2681; tsList[3] = 2823;
|
||||
|
||||
char* pOutput[4 * sizeof(int64_t)] = {0};
|
||||
int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4,
|
||||
// char* pOutput[4 * sizeof(int64_t)] = {0};
|
||||
len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4,
|
||||
ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
char* decompOutput[4 * 8] = {0};
|
||||
decompOutput[4 * 8] = {0};
|
||||
tsDecompressTimestamp(pOutput, len, 4, decompOutput, sizeof(int64_t) * 4, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
for (int32_t i = 0; i < 4; ++i) {
|
||||
|
|
Loading…
Reference in New Issue