other: merge 3.0

This commit is contained in:
Haojun Liao 2024-01-11 09:51:12 +08:00
commit bff0ad89a4
12 changed files with 334 additions and 429 deletions

View File

@ -35,5 +35,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
#endif // TDENGINE_TQ_COMMON_H #endif // TDENGINE_TQ_COMMON_H

View File

@ -8334,7 +8334,7 @@ int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
return 0; return 0;
} }
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
@ -8356,11 +8356,16 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
} }
} }
} }
return 0;
}
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
@ -8402,7 +8407,15 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
} }
} }
} }
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
return 0;
}
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
}
return 0; return 0;
} }
@ -8418,7 +8431,7 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
} }
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeMqDataRsp(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1; if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) { if (pRsp->createTableNum) {
@ -8432,7 +8445,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
} }
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1; if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) { if (pRsp->createTableNum) {
@ -8447,6 +8460,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
taosArrayPush(pRsp->createTableReq, &pCreate); taosArrayPush(pRsp->createTableReq, &pCreate);
} }
} }
return 0; return 0;
} }

View File

@ -194,7 +194,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb, -1, pReq, len, true, true); return tqStreamTaskProcessDeployReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pReq, len, true, true);
} }
case TDMT_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
@ -203,6 +203,10 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
case TDMT_VND_STREAM_TASK_RESET: case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg); return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_TASK_PAUSE:
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_RESUME:
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
default: default:
ASSERT(0); ASSERT(0);
} }

View File

@ -1100,108 +1100,11 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
pMeta->vgId, pTask->hTaskInfo.id.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask, pMeta);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
int32_t vgId = pTq->pStreamMeta->vgId;
if (pTask == NULL) {
return -1;
}
streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (status == TASK_STATUS__UNINIT) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync(pTq, false);
} else {
streamSchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
}
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} }
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) {
return code;
}
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
}
return code;
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -901,3 +901,108 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
pMeta->vgId, pTask->hTaskInfo.id.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask, pMeta);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated, bool fromVnode) {
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
int32_t vgId = pMeta->vgId;
if (pTask == NULL) {
return -1;
}
streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (status == TASK_STATUS__UNINIT) {
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync((STQ*)handle, false);
} else {
streamSchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
}
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode){
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
return code;
}
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHistoryTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(handle, pHistoryTask, sversion, pReq->igUntreated, fromVnode);
}
return code;
}

View File

@ -1494,7 +1494,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pBlockScanInfo)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
@ -1513,7 +1513,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t minKey = 0; int64_t minKey = 0;
if (pReader->info.order == TSDB_ORDER_ASC) { if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1526,7 +1526,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} else { } else {
minKey = INT64_MIN; minKey = INT64_MIN;
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1630,91 +1630,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
// create local variable to hold the row value
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
// only stt block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key, static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@ -1820,7 +1735,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pBlockScanInfo)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
@ -1869,7 +1784,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
} else { } else {
@ -1886,7 +1801,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
} }
@ -2177,10 +2092,15 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.nextProcKey = pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true; hasData = true;
} else { } else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
} }
} else { } else {
pScanInfo->cleanSttBlocks = false;
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
} }

View File

@ -22,12 +22,6 @@
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#define INIT_TIMEWINDOW(_w) \
do { \
(_w)->skey = INT64_MAX; \
(_w)->ekey = INT64_MIN; \
} while (0);
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t order); int32_t order);
@ -165,6 +159,9 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
INIT_TIMEWINDOW(&pScanInfo->sttWindow); INIT_TIMEWINDOW(&pScanInfo->sttWindow);
INIT_TIMEWINDOW(&pScanInfo->filesetWindow); INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
pScanInfo->cleanSttBlocks = false;
pScanInfo->sttBlockReturned = false;
pUidList->tableUidList[j] = idList[j].uid; pUidList->tableUidList[j] = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {

View File

@ -26,6 +26,12 @@ extern "C" {
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define INIT_TIMEWINDOW(_w) \
do { \
(_w)->skey = INT64_MAX; \
(_w)->ekey = INT64_MIN; \
} while (0);
typedef enum { typedef enum {
READER_STATUS_SUSPEND = 0x1, READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2, READER_STATUS_NORMAL = 0x2,

View File

@ -379,16 +379,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->rid = taosAddRef(streamMetaId, pMeta); pMeta->rid = taosAddRef(streamMetaId, pMeta);
// set the attribute when running on Linux OS // set the attribute when running on Linux OS
#if defined LINUX
TdThreadRwlockAttr attr; TdThreadRwlockAttr attr;
taosThreadRwlockAttrInit(&attr); taosThreadRwlockAttrInit(&attr);
#ifdef LINUX
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
taosThreadRwlockInit(&pMeta->lock, &attr);
taosThreadRwlockAttrDestroy(&attr);
#endif #endif
taosThreadRwlockInit(&pMeta->lock, &attr);
taosThreadRwlockAttrDestroy(&attr);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid); metaRefMgtAdd(pMeta->vgId, pRid);

View File

@ -86,30 +86,29 @@ class TDTestCase(TBase):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
if eos.isArm64Cpu(): if eos.isArm64Cpu():
tdLog.success(f"{__file__} arm64 ignore executed") tdLog.success(f"{__file__} arm64 ignore executed")
return else:
# insert data
self.insertData()
# insert data # check insert data correct
self.insertData() self.checkInsertCorrect()
# check insert data correct # save
self.checkInsertCorrect() self.snapshotAgg()
# save # do action
self.snapshotAgg() self.doAction()
# do action # check save agg result correct
self.doAction() self.checkAggCorrect()
# check save agg result correct # check insert correct again
self.checkAggCorrect() self.checkInsertCorrect()
# check insert correct again # drop database and free s3 file
self.checkInsertCorrect() self.dropDb()
# drop database and free s3 file tdLog.success(f"{__file__} successfully executed")
self.dropDb()
tdLog.success(f"{__file__} successfully executed")

349
tests/script/coverage_test.sh Executable file → Normal file
View File

@ -12,7 +12,6 @@ fi
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
TDENGINE_DIR=/root/TDengine TDENGINE_DIR=/root/TDengine
JDBC_DIR=/root/taos-connector-jdbc JDBC_DIR=/root/taos-connector-jdbc
TAOSKEEPER_DIR=/root/taoskeeper
TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log
# Color setting # Color setting
@ -23,52 +22,52 @@ GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
function buildTDengine() { function buildTDengine() {
echo "check if TDengine need build" echo "check if TDengine need build"
cd $TDENGINE_DIR cd $TDENGINE_DIR
git remote prune origin > /dev/null git remote prune origin > /dev/null
git remote update > /dev/null git remote update > /dev/null
REMOTE_COMMIT=`git rev-parse --short remotes/origin/$branch` REMOTE_COMMIT=`git rev-parse --short remotes/origin/$branch`
LOCAL_COMMIT=`git rev-parse --short @` LOCAL_COMMIT=`git rev-parse --short @`
echo " LOCAL: $LOCAL_COMMIT" echo " LOCAL: $LOCAL_COMMIT"
echo "REMOTE: $REMOTE_COMMIT" echo "REMOTE: $REMOTE_COMMIT"
# reset counter # reset counter
lcov -d . --zerocounters lcov -d . --zerocounters
if [ "$LOCAL_COMMIT" == "$REMOTE_COMMIT" ]; then if [ "$LOCAL_COMMIT" == "$REMOTE_COMMIT" ]; then
echo "repo up-to-date" echo "repo up-to-date"
else else
echo "repo need to pull" echo "repo need to pull"
fi fi
git reset --hard git reset --hard
git checkout -- . git checkout -- .
git checkout $branch git checkout $branch
git checkout -- . git checkout -- .
git clean -dfx git clean -dfx
git pull git pull
[ -d $TDENGINE_DIR/debug ] || mkdir $TDENGINE_DIR/debug [ -d $TDENGINE_DIR/debug ] || mkdir $TDENGINE_DIR/debug
cd $TDENGINE_DIR/debug cd $TDENGINE_DIR/debug
echo "rebuild.." echo "rebuild.."
LOCAL_COMMIT=`git rev-parse --short @` LOCAL_COMMIT=`git rev-parse --short @`
rm -rf * rm -rf *
if [ "$branch" == "3.0" ]; then if [ "$branch" == "3.0" ]; then
echo "3.0 =============" echo "3.0 ============="
cmake -DCOVER=true -DBUILD_TEST=true -DBUILD_HTTP=false -DBUILD_TOOLS=true .. > /dev/null cmake -DCOVER=true -DBUILD_TEST=true -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_GEOS=true -DBUILD_CONTRIB=true ..
else else
cmake -DCOVER=true -DBUILD_TEST=true -DBUILD_TOOLS=true -DBUILD_HTTP=false .. > /dev/null cmake -DCOVER=true -DBUILD_TOOLS=true -DBUILD_HTTP=false .. > /dev/null
fi fi
make -j make -j
make install make install
} }
function runCasesOneByOne () { function runCasesOneByOne () {
while read -r line; do while read -r line; do
if [[ "$line" != "#"* ]]; then if [[ "$line" != "#"* ]]; then
cmd=`echo $line | cut -d',' -f 5` cmd=`echo $line | cut -d',' -f 5`
if [[ "$2" == "sim" ]] && [[ $line == *"script"* ]]; then if [[ "$2" == "sim" ]] && [[ $line == *"script"* ]]; then
case=`echo $cmd | cut -d' ' -f 3` case=`echo $cmd | cut -d' ' -f 3`
start_time=`date +%s` start_time=`date +%s`
@ -77,18 +76,7 @@ function runCasesOneByOne () {
|| echo -e "${RED}$case failed${NC}" | tee -a $TDENGINE_COVERAGE_REPORT || echo -e "${RED}$case failed${NC}" | tee -a $TDENGINE_COVERAGE_REPORT
end_time=`date +%s` end_time=`date +%s`
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a $TDENGINE_COVERAGE_REPORT echo execution time of $case was `expr $end_time - $start_time`s. | tee -a $TDENGINE_COVERAGE_REPORT
elif [[ "$2" == "system-test" ]] && [[ $line == *"system-test"* ]]; then elif [$line == *"$2"* ]; then
if [[ "$cmd" == *"pytest.sh"* ]]; then
cmd=`echo $cmd | cut -d' ' -f 2-20`
fi
case=`echo $cmd | cut -d' ' -f 4-20`
start_time=`date +%s`
date +%F\ %T | tee -a $TDENGINE_COVERAGE_REPORT && $cmd > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a $TDENGINE_COVERAGE_REPORT || \
echo -e "${RED}$case failed${NC}" | tee -a $TDENGINE_COVERAGE_REPORT
end_time=`date +%s`
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a $TDENGINE_COVERAGE_REPORT
elif [[ "$2" == "develop-test" ]] && [[ $line == *"develop-test"* ]]; then
if [[ "$cmd" == *"pytest.sh"* ]]; then if [[ "$cmd" == *"pytest.sh"* ]]; then
cmd=`echo $cmd | cut -d' ' -f 2-20` cmd=`echo $cmd | cut -d' ' -f 2-20`
fi fi
@ -101,209 +89,176 @@ function runCasesOneByOne () {
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a $TDENGINE_COVERAGE_REPORT echo execution time of $case was `expr $end_time - $start_time`s. | tee -a $TDENGINE_COVERAGE_REPORT
fi fi
fi fi
done < $1 done < $1
} }
function runUnitTest() { function runUnitTest() {
echo "=== Run unit test case ===" echo "=== Run unit test case ==="
echo " $TDENGINE_DIR/debug" echo " $TDENGINE_DIR/debug"
cd $TDENGINE_DIR/debug cd $TDENGINE_DIR/debug
ctest -j12 ctest -j12
echo "3.0 unit test done"
echo " $TDENGINE_DIR/tests/script/api"
cd $TDENGINE_DIR/tests/script/api
make clean && make
stopTaosd
stopTaosadapter
nohup taosd -c /etc/taos >> /dev/null 2>&1 &
./batchprepare 127.0.0.1
./dbTableRoute 127.0.0.1
./stopquery 127.0.0.1 demo t1
echo "3.0 unit test done"
} }
function runSimCases() { function runSimCases() {
echo "=== Run sim cases ===" echo "=== Run sim cases ==="
cd $TDENGINE_DIR/tests/script cd $TDENGINE_DIR/tests/script
runCasesOneByOne ../parallel_test/cases.task sim runCasesOneByOne $TDENGINE_DIR/tests/parallel_test/cases.task sim
totalSuccess=`grep 'sim success' $TDENGINE_COVERAGE_REPORT | wc -l` totalSuccess=`grep 'sim success' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalSuccess" -gt "0" ]; then if [ "$totalSuccess" -gt "0" ]; then
echo "### Total $totalSuccess SIM test case(s) succeed! ###" | tee -a $TDENGINE_COVERAGE_REPORT echo "### Total $totalSuccess SIM test case(s) succeed! ###" | tee -a $TDENGINE_COVERAGE_REPORT
fi fi
totalFailed=`grep 'sim failed\|fault' $TDENGINE_COVERAGE_REPORT | wc -l` totalFailed=`grep 'sim failed\|fault' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalFailed" -ne "0" ]; then if [ "$totalFailed" -ne "0" ]; then
echo "### Total $totalFailed SIM test case(s) failed! ###" | tee -a $TDENGINE_COVERAGE_REPORT echo "### Total $totalFailed SIM test case(s) failed! ###" | tee -a $TDENGINE_COVERAGE_REPORT
fi fi
} }
function runPythonCases() { function runPythonCases() {
echo "=== Run python cases ===" echo "=== Run python cases ==="
cd $TDENGINE_DIR/tests/parallel_test cd $TDENGINE_DIR/tests/parallel_test
sed -i '/compatibility.py/d' cases.task sed -i '/compatibility.py/d' cases.task
cd $TDENGINE_DIR/tests/system-test # army
runCasesOneByOne ../parallel_test/cases.task system-test cd $TDENGINE_DIR/tests/army
runCasesOneByOne ../parallel_test/cases.task army
cd $TDENGINE_DIR/tests/develop-test # system-test
runCasesOneByOne ../parallel_test/cases.task develop-test cd $TDENGINE_DIR/tests/system-test
runCasesOneByOne ../parallel_test/cases.task system-test
totalSuccess=`grep 'py success' $TDENGINE_COVERAGE_REPORT | wc -l` # develop-test
if [ "$totalSuccess" -gt "0" ]; then cd $TDENGINE_DIR/tests/develop-test
echo "### Total $totalSuccess python test case(s) succeed! ###" | tee -a $TDENGINE_COVERAGE_REPORT runCasesOneByOne ../parallel_test/cases.task develop-test
fi
totalFailed=`grep 'py failed\|fault' $TDENGINE_COVERAGE_REPORT | wc -l` totalSuccess=`grep 'py success' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalFailed" -ne "0" ]; then if [ "$totalSuccess" -gt "0" ]; then
echo "### Total $totalFailed python test case(s) failed! ###" | tee -a $TDENGINE_COVERAGE_REPORT echo "### Total $totalSuccess python test case(s) succeed! ###" | tee -a $TDENGINE_COVERAGE_REPORT
fi fi
totalFailed=`grep 'py failed\|fault' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalFailed" -ne "0" ]; then
echo "### Total $totalFailed python test case(s) failed! ###" | tee -a $TDENGINE_COVERAGE_REPORT
fi
} }
function runJDBCCases() { function runJDBCCases() {
echo "=== Run JDBC cases ===" echo "=== Run JDBC cases ==="
cd $JDBC_DIR cd $JDBC_DIR
git checkout -- . git checkout -- .
git reset --hard HEAD git reset --hard HEAD
git checkout main git checkout main
git pull git pull
stopTaosd stopTaosd
stopTaosadapter stopTaosadapter
nohup taosd -c /etc/taos >> /dev/null 2>&1 & nohup $TDENGINE_DIR/debug/build/bin/taosd -c /etc/taos >> /dev/null 2>&1 &
nohup taosadapter >> /dev/null 2>&1 & nohup taosadapter >> /dev/null 2>&1 &
mvn clean test > result.txt 2>&1 mvn clean test > result.txt 2>&1
summary=`grep "Tests run:" result.txt | tail -n 1` summary=`grep "Tests run:" result.txt | tail -n 1`
echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT
}
function runTaosKeeperCases() {
echo "=== Run taoskeeper cases ==="
cd $TAOSKEEPER_DIR
git checkout -- .
git reset --hard HEAD
git checkout master
git pull
stopTaosd
stopTaosadapter
taosd -c /etc/taos >> /dev/null 2>&1 &
taosadapter >> /dev/null 2>&1 &
go mod tidy && go test -v ./...
} }
function runTest() { function runTest() {
echo "run Test" echo "run Test"
cd $TDENGINE_DIR cd $TDENGINE_DIR
[ -d sim ] && rm -rf sim [ -d sim ] && rm -rf sim
[ -f $TDENGINE_COVERAGE_REPORT ] && rm $TDENGINE_COVERAGE_REPORT [ -f $TDENGINE_COVERAGE_REPORT ] && rm $TDENGINE_COVERAGE_REPORT
runUnitTest runUnitTest
runSimCases runSimCases
runPythonCases runPythonCases
runJDBCCases runJDBCCases
runTaosKeeperCases
stopTaosd stopTaosd
cd $TDENGINE_DIR/tests/script cd $TDENGINE_DIR/tests/script
find . -name '*.sql' | xargs rm -f find . -name '*.sql' | xargs rm -f
cd $TDENGINE_DIR/tests/pytest cd $TDENGINE_DIR/tests/pytest
find . -name '*.sql' | xargs rm -f find . -name '*.sql' | xargs rm -f
} }
function lcovFunc { function lcovFunc {
echo "collect data by lcov" echo "collect data by lcov"
cd $TDENGINE_DIR cd $TDENGINE_DIR
# collect data # collect data
lcov -d . --capture --rc lcov_branch_coverage=1 --rc genhtml_branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info lcov --ignore-errors negative --ignore-errors mismatch -d . --capture --rc lcov_branch_coverage=1 --rc branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info
# remove exclude paths # remove exclude paths
if [ "$branch" == "main" ] ; then lcov --remove coverage.info \
lcov --remove coverage.info \ '*/contrib/*' '*/tests/*' '*/test/*' '*/packaging/*' '*/taos-tools/*' '*/taosadapter/*' '*/TSZ/*' \
'*/contrib/*' '*/tests/*' '*/test/*' '*/tools/*' '*/libs/sync/*'\ '*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\ '*/texpr.c' '*/runUdf.c' '*/schDbg.c' '*/syncIO.c' '*/tdbOs.c' '*/pushServer.c' '*/osLz4.c'\
'*/texpr.c' '*/runUdf.c' '*/schDbg.c' '*/syncIO.c' '*/tdbOs.c' '*/pushServer.c' '*/osLz4.c'\ '*/tbase64.c' '*/tbuffer.c' '*/tdes.c' '*/texception.c' '*/examples/*' '*/tidpool.c' '*/tmempool.c'\
'*/tbase64.c' '*/tbuffer.c' '*/tdes.c' '*/texception.c' '*/tidpool.c' '*/tmempool.c'\ '*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c'\
'*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c' '*/shellAuto.c' '*/shellTire.c'\ '*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' \
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' '*/catalog.c'\ '*/shellAuto.c' '*/shellTire.c' '*/shellCommand.c'\
'*/tqSnapshot.c' '*/tsdbSnapshot.c''*/metaSnapshot.c' '*/smaSnapshot.c' '*/tqOffsetSnapshot.c'\ --rc branch_coverage=1 -o coverage.info
'*/vnodeSnapshot.c' '*/metaSnapshot.c' '*/tsdbSnapshot.c' '*/mndGrant.c' '*/mndSnode.c' '*/streamRecover.c'\
'*/osAtomic.c' '*/osDir.c' '*/osFile.c' '*/osMath.c' '*/osSignal.c' '*/osSleep.c' '*/osString.c' '*/osSystem.c'\
'*/osThread.c' '*/osTime.c' '*/osTimezone.c' \
--rc lcov_branch_coverage=1 -o coverage.info
else
lcov --remove coverage.info \
'*/tests/*' '*/test/*' '*/deps/*' '*/plugins/*' '*/taosdef.h' '*/ttype.h' '*/tarithoperator.c' '*/TSDBJNIConnector.c' '*/taosdemo.c' '*/clientJniConnector.c'\
--rc lcov_branch_coverage=1 -o coverage.info
fi
# generate result
echo "generate result"
lcov -l --rc branch_coverage=1 coverage.info | tee -a $TDENGINE_COVERAGE_REPORT
# generate result sed -i 's/\/root\/TDengine\/sql.c/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.c/g' coverage.info
echo "generate result" sed -i 's/\/root\/TDengine\/sql.y/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.y/g' coverage.info
lcov -l --rc lcov_branch_coverage=1 coverage.info | tee -a $TDENGINE_COVERAGE_REPORT
# push result to coveralls.io # push result to coveralls.io
echo "push result to coveralls.io" echo "push result to coveralls.io"
/usr/local/bin/coveralls-lcov coverage.info -t o7uY02qEAgKyJHrkxLGiCOTfL3IGQR2zm | tee -a $TDENGINE_COVERAGE_REPORT /usr/local/bin/coveralls-lcov coverage.info -t o7uY02qEAgKyJHrkxLGiCOTfL3IGQR2zm | tee -a $TDENGINE_COVERAGE_REPORT
#/root/pxiao/checkCoverageFile.sh -s $TDENGINE_DIR/source -f $TDENGINE_COVERAGE_REPORT #/root/pxiao/checkCoverageFile.sh -s $TDENGINE_DIR/source -f $TDENGINE_COVERAGE_REPORT
#cat /root/pxiao/fileListNoCoverage.log | tee -a $TDENGINE_COVERAGE_REPORT #cat /root/pxiao/fileListNoCoverage.log | tee -a $TDENGINE_COVERAGE_REPORT
cat $TDENGINE_COVERAGE_REPORT | grep "| 0.0%" | awk -F "%" '{print $1}' | awk -F "|" '{if($2==0.0)print $1}' | tee -a $TDENGINE_COVERAGE_REPORT cat $TDENGINE_COVERAGE_REPORT | grep "| 0.0%" | awk -F "%" '{print $1}' | awk -F "|" '{if($2==0.0)print $1}' | tee -a $TDENGINE_COVERAGE_REPORT
} }
function sendReport { function sendReport {
echo "send report" echo "send report"
receiver="develop@taosdata.com" receiver="develop@taosdata.com"
mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n" mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n"
cd $TDENGINE_DIR cd $TDENGINE_DIR
sed -i 's/\x1b\[[0-9;]*m//g' $TDENGINE_COVERAGE_REPORT sed -i 's/\x1b\[[0-9;]*m//g' $TDENGINE_COVERAGE_REPORT
BODY_CONTENT=`cat $TDENGINE_COVERAGE_REPORT` BODY_CONTENT=`cat $TDENGINE_COVERAGE_REPORT`
echo -e "from: <support@taosdata.com>\nto: ${receiver}\nsubject: Coverage test report ${branch} ${today}, commit ID: ${LOCAL_COMMIT}\n\n${today}:\n${BODY_CONTENT}" | \ echo -e "from: <support@taosdata.com>\nto: ${receiver}\nsubject: Coverage test report ${branch} ${today}, commit ID: ${LOCAL_COMMIT}\n\n${today}:\n${BODY_CONTENT}" | \
(cat - && uuencode $TDENGINE_COVERAGE_REPORT coverage-report-$today.log) | \ (cat - && uuencode $TDENGINE_COVERAGE_REPORT coverage-report-$today.log) | \
/usr/sbin/ssmtp "${receiver}" && echo "Report Sent!" /usr/sbin/ssmtp "${receiver}" && echo "Report Sent!"
} }
function stopTaosd { function stopTaosd {
echo "Stop taosd start" echo "Stop taosd start"
systemctl stop taosd systemctl stop taosd
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ] while [ -n "$PID" ]
do do
pkill -TERM -x taosd pkill -TERM -x taosd
sleep 1 sleep 1
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done done
echo "Stop tasod end" echo "Stop tasod end"
} }
function stopTaosadapter { function stopTaosadapter {
echo "Stop taosadapter" echo "Stop taosadapter"
systemctl stop taosadapter.service systemctl stop taosadapter.service
PID=`ps -ef|grep -w taosadapter | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosadapter | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ] while [ -n "$PID" ]
do do
pkill -TERM -x taosadapter pkill -TERM -x taosadapter
sleep 1 sleep 1
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done done
echo "Stop tasoadapter end" echo "Stop tasoadapter end"
} }
@ -318,7 +273,7 @@ buildTDengine
runTest runTest
lcovFunc lcovFunc
sendReport #sendReport
stopTaosd stopTaosd
date >> $WORK_DIR/cron.log date >> $WORK_DIR/cron.log

View File

@ -16,7 +16,7 @@ class TDTestCase:
def run(self): def run(self):
dbname = "db" dbname = "db"
tdSql.prepare() tdSql.prepare(dbname=dbname, drop=True, stt_trigger=1)
tdSql.execute(f'''create table {dbname}.ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute(f'''create table {dbname}.ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')