Merge branch '3.0' into feat/TD-27463-3.0

This commit is contained in:
kailixu 2024-01-31 10:02:12 +08:00
commit 41072348b9
14 changed files with 29 additions and 16 deletions

View File

@ -206,6 +206,7 @@ typedef struct SDataBlockInfo {
int16_t hasVarCol; int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not int16_t dataLoad; // denote if the data is loaded or not
uint8_t scanFlag; uint8_t scanFlag;
bool blankFill;
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization int64_t version; // used for stream, and need serialization

View File

@ -114,6 +114,7 @@ typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index int32_t startRowIndex; // handle started row index
int64_t numOfRows; // the number of rows needs to be handled int64_t numOfRows; // the number of rows needs to be handled
bool blankFill; // fill blank data to block for empty table
int32_t numOfInputCols; // PTS is not included int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData *pPTS; // primary timestamp column

View File

@ -324,12 +324,13 @@ typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus; int8_t schedStatus;
int32_t schedIdleTime; // idle time before invoke again
int64_t lastExecTs; // last exec time stamp
int8_t statusBackup; int8_t statusBackup;
bool appendTranstateBlock; // has append the transfer state data block already int32_t schedIdleTime; // idle time before invoke again
int32_t timerActive; // timer is active int32_t timerActive; // timer is active
int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel; int32_t inScanHistorySentinel;
bool appendTranstateBlock; // has append the transfer state data block already
bool supplementaryWalscan; // complete the supplementary wal scan or not
} SStreamStatus; } SStreamStatus;
typedef struct SDataRange { typedef struct SDataRange {

View File

@ -599,7 +599,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished, pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
pDetail->newNumberFileset, pDetail->newFinished); pDetail->newNumberFileset, pDetail->newFinished);
if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished) //these 2 number will jump back after dnode restart, so < is not used here
if(pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
needSave = true; needSave = true;
} }

View File

@ -182,6 +182,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
mndTransDrop(pTrans);
return 0; return 0;
} }
@ -210,7 +211,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
}; };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name); mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
} }
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);

View File

@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version. // checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) { if (pChkInfo->checkpointId != 0) {
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} }

View File

@ -1080,7 +1080,7 @@ bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);

View File

@ -311,6 +311,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows; pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0; pInput->startRowIndex = 0;
pInput->blankFill = pBlock->info.blankFill;
// NOTE: the last parameter is the primary timestamp column // NOTE: the last parameter is the primary timestamp column
// todo: refactor this // todo: refactor this
@ -325,6 +326,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows; pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0; pInput->startRowIndex = 0;
pInput->blankFill = pBlock->info.blankFill;
code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows); code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -663,6 +663,8 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
pInfo->pResBlock->info.blankFill = false;
if (!pInfo->needCountEmptyTable) { if (!pInfo->needCountEmptyTable) {
pInfo->countState = TABLE_COUNT_STATE_END; pInfo->countState = TABLE_COUNT_STATE_END;
} else { } else {
@ -687,6 +689,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
pBlock->info.rows = 1; pBlock->info.rows = 1;
pBlock->info.id.uid = tbInfo->uid; pBlock->info.id.uid = tbInfo->uid;
pBlock->info.id.groupId = tbInfo->groupId; pBlock->info.id.groupId = tbInfo->groupId;
pBlock->info.blankFill = true;
// only one row: set all col data to null & hasNull // only one row: set all col data to null & hasNull
int32_t col_num = blockDataGetNumOfCols(pBlock); int32_t col_num = blockDataGetNumOfCols(pBlock);
@ -696,7 +699,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
} }
// set tag/tbname // set tag/tbname
doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(pBase, pBlock, pTaskInfo, 1);
return pBlock; return pBlock;
} }

View File

@ -499,6 +499,9 @@ static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
*/ */
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
if(1 == pInput->numOfRows && pInput->blankFill) {
return 0;
}
if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) { if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
} else { } else {
@ -6022,7 +6025,7 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
goto _group_key_over; goto _group_key_over;
} }
if (colDataIsNull_s(pInputCol, startIndex)) { if (pInputCol->pData == NULL || colDataIsNull_s(pInputCol, startIndex)) {
pInfo->isNull = true; pInfo->isNull = true;
pInfo->hasResult = true; pInfo->hasResult = true;
goto _group_key_over; goto _group_key_over;

View File

@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter",
pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer);
} else { } else {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
} }
@ -400,7 +402,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream // 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask); streamTaskSendCheckpointReq(pStreamTask);
// streamTaskResume(pStreamTask);
// 4. assign the status to the value that will be kept in disk // 4. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
@ -777,6 +778,8 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) { while (1) {
/*int32_t code = */ doStreamExecTask(pTask); /*int32_t code = */ doStreamExecTask(pTask);
// check if continue
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);

View File

@ -1322,7 +1322,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else { } else {
stDebug("vgId:%d meta-runlock completed", pMeta->vgId); // stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
} }
} }

View File

@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event); streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
int64_t initTs = pTask->execInfo.init; int64_t initTs = pTask->execInfo.init;
@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win); streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
} }
} }

View File

@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) { if (!pCur) {
return; return;
} }
qDebug("streamStateFreeCur");
streamStateResetCur(pCur); streamStateResetCur(pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
} }