Merge pull request #23831 from taosdata/fix/3_liaohj

enh(stream/tsdb): do multiple optimization and fix some bugs.
This commit is contained in:
Haojun Liao 2023-11-30 09:20:15 +08:00 committed by GitHub
commit 130614c938
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 624 additions and 320 deletions

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE FALSE) set(CMAKE_VERBOSE_MAKEFILE TRUE)
set(TD_BUILD_TAOSA_INTERNAL FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory #set output directory
@ -159,6 +159,7 @@ ELSE ()
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F) CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI) CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
IF (COMPILER_SUPPORT_SSE42) IF (COMPILER_SUPPORT_SSE42)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
@ -183,7 +184,13 @@ ELSE ()
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi")
MESSAGE(STATUS "avx512 supported by gcc") MESSAGE(STATUS "avx512f/avx512bmi supported by compiler")
ENDIF()
IF (COMPILER_SUPPORT_AVX512VL)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512vl")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl")
MESSAGE(STATUS "avx512vl supported by compiler")
ENDIF() ENDIF()
ENDIF() ENDIF()

View File

@ -668,6 +668,8 @@ typedef struct STaskStatusEntry {
int32_t relatedHTask; // has related fill-history task int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double sinkQuota; // existed quota size for sink task double sinkQuota; // existed quota size for sink task

View File

@ -139,6 +139,8 @@ int32_t getWordLength(char type);
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian);
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian);
/************************************************************************* /*************************************************************************
* STREAM COMPRESSION * STREAM COMPRESSION

View File

@ -126,9 +126,9 @@ void queryCallback(void* param, void* res, int32_t code) {
taos_fetch_raw_block_a(res, fetchCallback, param); taos_fetch_raw_block_a(res, fetchCallback, param);
} }
void createNewTable(TAOS* pConn, int32_t index) { void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t startTs, const char* pVarchar) {
char str[1024] = {0}; char str[1024] = {0};
sprintf(str, "create table tu%d using st2 tags(%d)", index, index); sprintf(str, "create table if not exists tu%d using st2 tags(%d)", index, index);
TAOS_RES* pRes = taos_query(pConn, str); TAOS_RES* pRes = taos_query(pConn, str);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -136,7 +136,8 @@ void createNewTable(TAOS* pConn, int32_t index) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 10000; i += 20) { if (startTs == 0) {
for (int32_t i = 0; i < numOfRows; i += 20) {
char sql[1024] = {0}; char sql[1024] = {0};
sprintf(sql, sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
@ -153,6 +154,26 @@ void createNewTable(TAOS* pConn, int32_t index) {
taos_free_result(p); taos_free_result(p);
} }
} else {
for (int32_t i = 0; i < numOfRows; i += 20) {
char sql[1024*50] = {0};
sprintf(sql,
"insert into tu%d values(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, "
"%d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, "
"'%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')",
index, startTs, i, pVarchar, startTs + 1, i + 1, pVarchar, startTs + 2, i + 2, pVarchar, startTs + 3, i + 3, pVarchar, startTs + 4, i + 4,
pVarchar, startTs + 5, i + 5, pVarchar, startTs + 6, i + 6, pVarchar, startTs + 7, i + 7, pVarchar, startTs + 8, i + 8, pVarchar, startTs + 9, i + 9,
pVarchar, startTs + 10, i + 10, pVarchar, startTs + 11, i + 11, pVarchar, startTs + 12, i + 12, pVarchar, startTs + 13, i + 13, pVarchar, startTs + 14,
i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18,
pVarchar, startTs + 19, i + 19, pVarchar);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
taos_free_result(p);
}
}
} }
void* queryThread(void* arg) { void* queryThread(void* arg) {
@ -808,14 +829,7 @@ TEST(clientCase, projection_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} }
@ -828,28 +842,32 @@ TEST(clientCase, projection_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
int64_t start = 1685959190000; int64_t start = 1685959190000;
const char* pstr =
"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefgh"
"ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnop"
"qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwx"
"yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef"
"ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:"
"QWERTYUIOP{}";
int32_t code = -1; for(int32_t i = 0; i < 10000; ++i) {
for(int32_t i = 0; i < 1000000; ++i) { char str[1024] = {0};
char t[512] = {0}; sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i);
sprintf(t, "insert into t1 values(now, %d)", i); TAOS_RES* px = taos_query(pConn, str);
while(1) { if (taos_errno(px) != 0) {
void* p = taos_query(pConn, t); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
code = taos_errno(p);
taos_free_result(p);
if (code != 0) {
printf("insert data error, retry\n");
} else {
break;
} }
taos_free_result(px);
}
for(int32_t j = 0; j < 5000; ++j) {
start += 20;
for (int32_t i = 0; i < 10000; ++i) {
createNewTable(pConn, i, 20, start, pstr);
} }
} }
for (int32_t i = 0; i < 1; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
// //
// pRes = taos_query(pConn, "select * from tu"); // pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {

View File

@ -52,7 +52,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock);
// for sma // for sma
// TODO refactor // TODO refactor

View File

@ -762,13 +762,14 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
} }
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SStreamObj * pStream = NULL; SStreamObj *pStream = NULL;
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
char *sql = NULL;
int32_t sqlLen = 0;
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
@ -799,9 +800,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
char* sql = NULL; if (createStreamReq.sql != NULL) {
int32_t sqlLen = 0;
if(createStreamReq.sql != NULL){
sqlLen = strlen(createStreamReq.sql); sqlLen = strlen(createStreamReq.sql);
sql = taosMemoryMalloc(sqlLen + 1); sql = taosMemoryMalloc(sqlLen + 1);
memset(sql, 0, sqlLen + 1); memset(sql, 0, sqlLen + 1);
@ -888,14 +887,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// reuse this function for stream // reuse this function for stream
if (sql != NULL && sqlLen > 0) { if (sql != NULL && sqlLen > 0) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
sqlLen); } else {
}
else{
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname); sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail)); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
} }
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
@ -904,7 +902,7 @@ _OVER:
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createStreamReq); tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj); tFreeStreamObj(&streamObj);
if(sql != NULL){ if (sql != NULL) {
taosMemoryFreeClear(sql); taosMemoryFreeClear(sql);
} }
return code; return code;
@ -1355,7 +1353,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
@ -1838,7 +1836,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
@ -1973,7 +1971,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
@ -2761,7 +2759,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
break; break;
} }
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false);
if (conflict) { if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb); pStream->name, pStream->sourceDb, pStream->targetDb);
@ -2797,7 +2795,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) {
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) { if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
int32_t numOfReady = 0; int32_t numOfReady = 0;
int32_t numOfTotal = 0; int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
@ -2940,6 +2938,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
bool snodeChanged = false; bool snodeChanged = false;
for (int32_t i = 0; i < req.numOfTasks; ++i) { for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) { if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
@ -2948,8 +2947,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage); updateStageInfo(pTaskEntry, p->stage);
if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; if(pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else { } else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) { if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) { if (activeCheckpointId != 0) {

View File

@ -35,17 +35,15 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pS
} }
int32_t clearFinishedTrans(SMnode* pMnode) { int32_t clearFinishedTrans(SMnode* pMnode) {
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
size_t keyLen = 0; size_t keyLen = 0;
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
taosThreadMutexLock(&execInfo.lock);
void* pIter = NULL; void* pIter = NULL;
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter;
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
// let's clear the finished trans // let's clear the finished trans
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
if (pTrans == NULL) { if (pTrans == NULL) {
void* pKey = taosHashGetKey(pEntry, &keyLen); void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name // key is the name of src/dst db name
@ -60,44 +58,55 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
} }
size_t num = taosArrayGetSize(pList); size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SKeyInfo* pKey = taosArrayGet(pList, i); SKeyInfo* pKey = taosArrayGet(pList, i);
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
} }
mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
taosThreadMutexUnlock(&execInfo.lock);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
taosArrayDestroy(pList); taosArrayDestroy(pList);
return 0; return 0;
} }
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) {
clearFinishedTrans(pMnode); if (lock) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
}
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) { if (num <= 0) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
return false; return false;
} }
clearFinishedTrans(pMnode);
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
if (pEntry != NULL) { if (pEntry != NULL) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true; return true;
} }
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
if (pEntry != NULL) { if (pEntry != NULL) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true; return true;
} }
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
return false; return false;
} }

View File

@ -93,7 +93,11 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VNODE_TQ_STREAM "stream" #define VNODE_TQ_STREAM "stream"
#if SUSPEND_RESUME_TEST // only for test purpose
#define VNODE_BUFPOOL_SEGMENTS 1
#else
#define VNODE_BUFPOOL_SEGMENTS 3 #define VNODE_BUFPOOL_SEGMENTS 3
#endif
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"

View File

@ -1415,6 +1415,9 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
// clear the scheduler status
streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -48,7 +48,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -168,7 +169,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
@ -291,11 +292,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
} }
static int32_t tsdbInitReaderLock(STsdbReader* pReader) { static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
@ -324,22 +321,14 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) {
} }
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
} }
static int32_t tsdbReleaseReader(STsdbReader* pReader) { static int32_t tsdbReleaseReader(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
@ -432,6 +421,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
} }
tsdbInitReaderLock(pReader); tsdbInitReaderLock(pReader);
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
*ppReader = pReader; *ppReader = pReader;
return code; return code;
@ -1015,8 +1005,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// check if current block are all handled // check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (outOfTimeWindow(ts, if (outOfTimeWindow(ts, &pReader->info.window)) {
&pReader->info.window)) { // the remain data has out of query time window, ignore current block // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->info.order); setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
} }
} else { } else {
@ -1123,16 +1113,12 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
} }
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
STableDataBlockIdx* pTableDataBlockIdx = STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
memcpy(pRecord, &p->record, sizeof(SBrinRecord)); memcpy(pRecord, &p->record, sizeof(SBrinRecord));
*nextIndex = pBlockInfo->tbBlockIdx + step; *nextIndex = pBlockInfo->tbBlockIdx + step;
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return true; return true;
} }
@ -1376,23 +1362,19 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]); double el = (taosGetTimestampUs() - st) / 1000.0;
pBlock->info.id.uid = pBlockScanInfo->uid; updateComposedBlockInfo(pReader, el, pBlockScanInfo);
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s", " - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr); pBlockScanInfo->uid, pReader->idStr);
pReader->cost.buildmemBlock += elapsedTime; pReader->cost.buildmemBlock += el;
return code; return code;
} }
@ -2293,13 +2275,12 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
return code; return code;
} }
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1; pResBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
pReader->cost.composedBlocks += 1; pReader->cost.composedBlocks += 1;
@ -2356,7 +2337,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = *pReader->status.pTableIter; pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables && if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
return code; return code;
} }
} }
@ -2436,7 +2416,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
} }
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) { int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost) {
int32_t code = 0; int32_t code = 0;
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData); int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData);
if (newDelDataInFile == 0 && if (newDelDataInFile == 0 &&
@ -2935,6 +2915,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
tsdbDebug("seq load data blocks from cache, %s", pReader->idStr);
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
@ -3043,6 +3025,8 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
tsdbDebug("seq load data blocks from stt files %s", pReader->idStr);
while (1) { while (1) {
terrno = 0; terrno = 0;
@ -3774,7 +3758,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
do { do {
// SRow* pTSRow = NULL;
TSDBROW row = {.type = -1}; TSDBROW row = {.type = -1};
bool freeTSRow = false; bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
@ -3783,6 +3766,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
} }
if (row.type == TSDBROW_ROW_FMT) { if (row.type == TSDBROW_ROW_FMT) {
int64_t ts = row.pTSRow->ts;;
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) { if (freeTSRow) {
@ -3792,13 +3776,17 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
if (code) { if (code) {
return code; return code;
} }
pBlockScanInfo->lastProcKey = ts;
} else { } else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) { if (code) {
break; break;
} }
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
} }
// no data in buffer, return immediately // no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
break; break;
@ -4107,7 +4095,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
} }
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pLastBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
@ -4122,6 +4110,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
tsem_destroy(&pReader->resumeAfterSuspend);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
tsdbUninitReaderLock(pReader); tsdbUninitReaderLock(pReader);
@ -4154,6 +4143,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
pReader->status.suspendInvoked = true; // record the suspend status
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
@ -4167,84 +4158,34 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
}
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t iter = 0;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
clearBlockScanInfo(pInfo);
pInfo->iterInit = false; pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
} }
if (pInfo->iiter.iter != NULL) { pStatus->uidList.currentIndex = 0;
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); initReaderStatus(pStatus);
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData);
}
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
}
if (pInfo->iiter.iter != NULL) {
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
}
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;
pBlockScanInfo->iter.hasVal = false;
pBlockScanInfo->iiter.hasVal = false;
if (pBlockScanInfo->iter.iter != NULL) {
pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
}
if (pBlockScanInfo->iiter.iter != NULL) {
pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
}
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList);
// TODO: keep skyline for reuse
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
}
}
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
#if SUSPEND_RESUME_TEST
tsem_post(&pReader->resumeAfterSuspend);
#endif
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
pReader->idStr); pReader->idStr);
return code; return code;
@ -4399,6 +4340,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
// the version range of query will be used to identify the correctness of suspend/resume functions.
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
#if SUSPEND_RESUME_TEST
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
tsem_wait(&pReader->resumeAfterSuspend);
}
#endif
code = tsdbAcquireReader(pReader); code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

View File

@ -210,6 +210,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iterInit = false; p->iterInit = false;
p->iter.hasVal = false; p->iter.hasVal = false;
p->iiter.hasVal = false; p->iiter.hasVal = false;
p->sttKeyInfo.status = STT_FILE_READER_UNINIT;
if (p->iter.iter != NULL) { if (p->iter.iter != NULL) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);

View File

@ -96,7 +96,7 @@ typedef struct SResultBlockInfo {
int64_t capacity; int64_t capacity;
} SResultBlockInfo; } SResultBlockInfo;
typedef struct SCostSummary { typedef struct SReadCostSummary {
int64_t numOfBlocks; int64_t numOfBlocks;
double blockLoadTime; double blockLoadTime;
double buildmemBlock; double buildmemBlock;
@ -110,7 +110,7 @@ typedef struct SCostSummary {
double createScanInfoList; double createScanInfoList;
double createSkylineIterTime; double createSkylineIterTime;
double initLastBlockReader; double initLastBlockReader;
} SCostSummary; } SReadCostSummary;
typedef struct STableUidList { typedef struct STableUidList {
uint64_t* tableUidList; // access table uid list in uid ascending order list uint64_t* tableUidList; // access table uid list in uid ascending order list
@ -122,12 +122,6 @@ typedef struct {
int32_t numOfSttFiles; int32_t numOfSttFiles;
} SBlockNumber; } SBlockNumber;
typedef struct SBlockIndex {
int32_t ordinalIndex;
int64_t inFileOffset;
STimeWindow window; // todo replace it with overlap flag.
} SBlockIndex;
typedef struct SBlockOrderWrapper { typedef struct SBlockOrderWrapper {
int64_t uid; int64_t uid;
int64_t offset; int64_t offset;
@ -192,6 +186,7 @@ typedef struct SFileBlockDumpInfo {
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
typedef struct SReaderStatus { typedef struct SReaderStatus {
bool suspendInvoked;
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not bool composedDataBlock; // the returned data block is a composed block or not
SSHashObj* pTableMap; // SHash<STableBlockScanInfo> SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
@ -220,7 +215,8 @@ struct STsdbReader {
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap; STsdbReadSnap* pReadSnap;
SCostSummary cost; tsem_t resumeAfterSuspend;
SReadCostSummary cost;
SHashObj** pIgnoreTables; SHashObj** pIgnoreTables;
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFileReader* pFileReader; // the file reader SDataFileReader* pFileReader; // the file reader

View File

@ -254,12 +254,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
const char* id = pTask->id.idStr;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
while (1) { while (1) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); stDebug("s-task:%s paused from the scan-history task", id);
// quit from step1, not continue to handle the step2 // quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
} }
@ -267,8 +268,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
tstrerror(terrno));
continue; continue;
} }
@ -281,12 +281,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
} }
// dispatch the generated results // dispatch the generated results
int32_t code = handleResultBlocks(pTask, pRes, size); /*int32_t code = */handleResultBlocks(pTask, pRes, size);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
// downstream task input queue is full, try in 5sec // downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
} }
@ -294,9 +294,9 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
} }
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
} }
} }
@ -543,7 +543,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
*/ */
int32_t streamExecForAll(SStreamTask* pTask) { int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
@ -654,7 +654,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) { while (1) {
int32_t code = streamExecForAll(pTask); int32_t code = doStreamExecTask(pTask);
if (code < 0) { // todo this status should be removed if (code < 0) { // todo this status should be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1; return -1;

View File

@ -471,6 +471,14 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false);
streamMetaReleaseTask(pTask->pMeta, pHTask);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
@ -1072,8 +1080,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { if (numOfRecv == numOfTotal) {
pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
@ -1087,6 +1096,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo); streamMetaResetStartInfo(pStartInfo);
} else {
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);

View File

@ -538,6 +538,11 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
memcpy(output, input + 1, nelements * longBytes); memcpy(output, input + 1, nelements * longBytes);
return nelements * longBytes; return nelements * longBytes;
} else if (input[0] == 1) { // Decompress } else if (input[0] == 1) { // Decompress
if (tsSIMDEnable && tsAVX512Enable) {
tsDecompressTimestampAvx512(input, nelements, output, false);
} else if (tsSIMDEnable && tsAVX2Enable) {
tsDecompressTimestampAvx2(input, nelements, output, false);
} else {
int64_t *ostream = (int64_t *)output; int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0; int32_t ipos = 1, opos = 0;
@ -561,6 +566,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
} }
delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
} }
ipos += nbytes; ipos += nbytes;
if (opos == 0) { if (opos == 0) {
prev_value = delta_of_delta; prev_value = delta_of_delta;
@ -593,11 +599,10 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
ostream[opos++] = prev_value; ostream[opos++] = prev_value;
if (opos == nelements) return nelements * longBytes; if (opos == nelements) return nelements * longBytes;
} }
} else {
ASSERT(0);
return -1;
} }
}
return nelements * longBytes;
} }
/* --------------------------------------------Double Compression ---------------------------------------------- */ /* --------------------------------------------Double Compression ---------------------------------------------- */

View File

@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int64_t prevValue = 0; int64_t prevValue = 0;
#if __AVX2__ #if __AVX2__
while (1) { while (_pos < nelements) {
if (_pos == nelements) break; uint64_t w = *(uint64_t*) ip;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
char selector = (char)(w & INT64MASK(4)); // selector = 4 char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3 char bit = bit_per_integer[(int32_t)selector]; // bit = 3
@ -114,7 +111,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
// get the four zigzag values here // get four zigzag values here
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number // calculate the cumulative sum (prefix sum) for each number
@ -250,73 +247,264 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen
return 0; return 0;
} }
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output,
bool bigEndian) {
#if 0
int64_t *ostream = (int64_t *)output; int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0; int32_t ipos = 1, opos = 0;
int8_t nbytes = 0; __m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128();
int64_t prevValue = 0;
int64_t prevDelta = 0;
int64_t deltaOfDelta = 0;
int32_t longBytes = LONG_BYTES;
#if __AVX2__ #if __AVX2__
int32_t batch = nelements >> 1;
int32_t remainder = nelements & 0x01;
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
int32_t batch = nelements >> 2; int32_t i = 0;
int32_t remainder = nelements & 0x1; if (batch > 1) {
// first loop
while (1) {
uint8_t flags = input[ipos++]; uint8_t flags = input[ipos++];
// Decode dd1 int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
uint64_t dd1 = 0; int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
nbytes = flags & INT8MASK(4);
__m128i data1;
if (nbytes1 == 0) {
data1 = _mm_setzero_si128();
} else {
memcpy(&data1, (const void*) (input + ipos), nbytes1);
}
__m128i data2;
if (nbytes2 == 0) {
data2 = _mm_setzero_si128();
} else {
memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2);
}
data2 = _mm_broadcastq_epi64(data2);
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
// get two zigzag values here
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
_mm_storeu_si128((__m128i *)&ostream[opos], val);
// keep the previous value
prevVal = _mm_shuffle_epi32 (val, 0xEE);
// keep the previous delta of delta, for the first item
prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE);
opos += 2;
ipos += nbytes1 + nbytes2;
i += 1;
}
// the remain
for(; i < batch; ++i) {
uint8_t flags = input[ipos++];
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
// __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
// __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
__m128i data1;
if (nbytes1 == 0) {
data1 = _mm_setzero_si128();
} else {
int64_t dd = 0;
memcpy(&dd, (const void*) (input + ipos), nbytes1);
data1 = _mm_loadu_si64(&dd);
}
__m128i data2;
if (nbytes2 == 0) {
data2 = _mm_setzero_si128();
} else {
int64_t dd = 0;
memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2);
data2 = _mm_loadu_si64(&dd);
}
data2 = _mm_broadcastq_epi64(data2);
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
// get two zigzag values here
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
_mm_storeu_si128((__m128i *)&ostream[opos], val);
// keep the previous value
prevVal = _mm_shuffle_epi32 (val, 0xEE);
// keep the previous delta of delta
__m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta);
prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE);
opos += 2;
ipos += nbytes1 + nbytes2;
}
if (remainder > 0) {
uint64_t dd = 0;
uint8_t flags = input[ipos++];
int32_t nbytes = flags & INT8MASK(4);
int64_t deltaOfDelta = 0;
if (nbytes == 0) { if (nbytes == 0) {
deltaOfDelta = 0; deltaOfDelta = 0;
} else { } else {
if (bigEndian) { // if (is_bigendian()) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); // memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else { // } else {
memcpy(&dd1, input + ipos, nbytes); memcpy(&dd, input + ipos, nbytes);
} // }
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1); deltaOfDelta = ZIGZAG_DECODE(int64_t, dd);
} }
ipos += nbytes; ipos += nbytes;
prevDelta += deltaOfDelta; if (opos == 0) {
prevValue += prevDelta; ostream[opos++] = deltaOfDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else { } else {
if (bigEndian) { int64_t prevDeltaX = deltaOfDelta + prevDelta[1];
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); ostream[opos++] = prevVal[1] + prevDeltaX;
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
} }
} }
#endif
#endif
return 0;
}
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool UNUSED_PARAM(bigEndian)) {
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
#if __AVX512VL__
__m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128();
int32_t numOfBatch = nelements >> 1;
int32_t remainder = nelements & 0x01;
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
int32_t i = 0;
if (numOfBatch > 1) {
// first loop
uint8_t flags = input[ipos++];
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
__m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
__m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
data2 = _mm_broadcastq_epi64(data2);
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
// get two zigzag values here
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
_mm_storeu_si128((__m128i *)&ostream[opos], val);
// keep the previous value
prevVal = _mm_shuffle_epi32 (val, 0xEE);
// keep the previous delta of delta, for the first item
prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE);
opos += 2;
ipos += nbytes1 + nbytes2;
i += 1;
}
// the remain
for(; i < numOfBatch; ++i) {
uint8_t flags = input[ipos++];
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
__m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
__m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
data2 = _mm_broadcastq_epi64(data2);
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
// get two zigzag values here
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
_mm_storeu_si128((__m128i *)&ostream[opos], val);
// keep the previous value
prevVal = _mm_shuffle_epi32 (val, 0xEE);
// keep the previous delta of delta
__m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta);
prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE);
opos += 2;
ipos += nbytes1 + nbytes2;
}
if (remainder > 0) {
uint64_t dd = 0;
uint8_t flags = input[ipos++];
int32_t nbytes = flags & INT8MASK(4);
int64_t deltaOfDelta = 0;
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
memcpy(&dd, input + ipos, nbytes);
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd);
}
ipos += nbytes;
if (opos == 0) {
ostream[opos++] = deltaOfDelta;
} else {
int64_t prevDeltaX = deltaOfDelta + prevDelta[1];
ostream[opos++] = prevVal[1] + prevDeltaX;
}
}
#endif #endif
return 0; return 0;
} }

View File

@ -0,0 +1,94 @@
#include <gtest/gtest.h>
#include <stdlib.h>
#include <tcompression.h>
#include <random>
namespace {} // namespace
TEST(utilTest, decompress_test) {
int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400,
1700000500, 1700000600, 1700000700, 1700000800, 1700000900};
char* pOutput[10 * sizeof(int64_t)] = {0};
int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0);
char* decompOutput[10 * 8] = {0};
tsDecompressTimestamp(pOutput, len, 10, decompOutput, sizeof(int64_t)*10, ONE_STAGE_COMP, NULL, 0);
for(int32_t i = 0; i < 10; ++i) {
std::cout<< ((int64_t*)decompOutput)[i] << std::endl;
}
memset(decompOutput, 0, 10*8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 10,
reinterpret_cast<char* const>(decompOutput), false);
for(int32_t i = 0; i < 10; ++i) {
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int64_t tsList1[7] = {1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000900};
int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7, ONE_STAGE_COMP, NULL, 0);
memset(decompOutput, 0, 10*8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 7,
reinterpret_cast<char* const>(decompOutput), false);
for(int32_t i = 0; i < 7; ++i) {
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int64_t tsList2[1] = {1700000000};
int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1, ONE_STAGE_COMP, NULL, 0);
memset(decompOutput, 0, 10*8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 1,
reinterpret_cast<char* const>(decompOutput), false);
for(int32_t i = 0; i < 1; ++i) {
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
}
}
TEST(utilTest, decompress_perf_test) {
int32_t num = 10000;
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
int64_t iniVal = 1700000000;
uint32_t v = 100;
for(int32_t i = 0; i < num; ++i) {
iniVal += taosRandR(&v)%10;
pList[i] = iniVal;
}
char* px = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
int32_t len = tsCompressTimestamp(pList, num * sizeof(int64_t), num, px, num, ONE_STAGE_COMP, NULL, 0);
char* pOutput = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
int64_t st = taosGetTimestampUs();
for(int32_t k = 0; k < 10000; ++k) {
tsDecompressTimestamp(px, len, num, pOutput, sizeof(int64_t) * num, ONE_STAGE_COMP, NULL, 0);
}
int64_t el1 = taosGetTimestampUs() - st;
std::cout << "soft decompress elapsed time:" << el1 << " us" << std::endl;
memset(pOutput, 0, num * sizeof(int64_t));
st = taosGetTimestampUs();
for(int32_t k = 0; k < 10000; ++k) {
tsDecompressTimestampAvx512(px, num, pOutput, false);
}
int64_t el2 = taosGetTimestampUs() - st;
std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl;
taosMemoryFree(pList);
taosMemoryFree(pOutput);
taosMemoryFree(px);
}