fix: invalid task status update issue

This commit is contained in:
dapan1121 2023-07-26 13:54:58 +08:00
parent fb63294289
commit 6a0061ccfb
13 changed files with 106 additions and 32 deletions

View File

@ -97,6 +97,8 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC
void dsEndPut(DataSinkHandle handle, uint64_t useconds);
void dsReset(DataSinkHandle handle);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle

View File

@ -20,6 +20,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
int32_t code = 0;
STsdbFD *pFD = NULL;
szPage = 4096; //debug only!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
*ppFD = NULL;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);

View File

@ -35,6 +35,7 @@ typedef struct SDataSinkManager {
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FReset)(struct SDataSinkHandle* pHandle);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
@ -43,6 +44,7 @@ typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
FEndPut fEndPut;
FReset fReset;
FGetDataLength fGetLen;
FGetDataBlock fGetData;
FDestroyDataSinker fDestroy;

View File

@ -41,7 +41,7 @@ typedef struct SGcFileCacheCtx {
uint32_t fileId;
SHashObj* pCacheFile;
int32_t baseNameLen;
char baseFilename[PATH_MAX];
char baseFilename[256];
} SGcFileCacheCtx;
typedef struct SGcDownstreamCtx {
@ -115,6 +115,7 @@ typedef struct SGcSessionCtx {
bool semInit;
tsem_t waitSem;
bool newFetch;
int64_t resRows;
} SGcSessionCtx;
typedef struct SGcBlkBufInfo {

View File

@ -156,6 +156,13 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDispatcher->mutex);
}
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
taosThreadMutexLock(&pDispatcher->mutex);
pDispatcher->queryEnd = false;
taosThreadMutexUnlock(&pDispatcher->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
@ -182,6 +189,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (NULL == pDispatcher->nextOutput.pData) {
@ -244,6 +252,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fReset = resetDispatcher;
dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;

View File

@ -59,6 +59,13 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
return pHandleImpl->fEndPut(pHandleImpl, useconds);
}
void dsReset(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
if (pHandleImpl->fReset) {
return pHandleImpl->fReset(pHandleImpl);
}
}
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);

View File

@ -186,7 +186,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
taosWLockLatch(&pCache->dirtyLock);
pCache->blkCacheSize += pBufInfo->basic.bufSize;
qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
qError("group cache total dirty block num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
if (NULL == pCache->pDirtyHead) {
pCache->pDirtyHead = pBufInfo;
@ -259,7 +259,7 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB
return code;
}
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
void blockDataDeepClear(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
@ -276,6 +276,25 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0;
}
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
taosMemoryFreeClear(p->pData);
if (IS_VAR_DATA_TYPE(p->info.type)) {
taosMemoryFreeClear(p->varmeta.offset);
p->varmeta.length = 0;
p->varmeta.allocLen = 0;
} else {
taosMemoryFreeClear(p->nullbitmap);
}
}
pDataBlock->info.capacity = 0;
pDataBlock->info.rows = 0;
}
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
*ppDst = taosMemoryMalloc(sizeof(*pSrc));
if (NULL == *ppDst) {
@ -288,7 +307,7 @@ static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc)
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
blockDataDeepCleanup(*ppDst);
blockDataDeepClear(*ppDst);
return TSDB_CODE_SUCCESS;
}
@ -919,8 +938,12 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
}
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
if (NULL == ppRes) {
if (NULL == *ppRes) {
taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
qError("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
} else {
pSession->resRows += (*ppRes)->info.rows;
qError("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
}
return code;

View File

@ -43,6 +43,7 @@ typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
bool downstreamFetchDone[2];
int16_t downstreamResBlkId[2];
SSDataBlock* pLeft;
@ -65,6 +66,8 @@ typedef struct SMJoinOperatorInfo {
SSHashObj* rightBuildTable;
SMJoinRowCtx rowCtx;
int64_t resRows;
} SMJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
@ -457,7 +460,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
}
if (dataBlock == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
pJoinInfo->downstreamFetchDone[whichChild] = true;
endPos = -1;
break;
}
@ -654,11 +657,15 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
bool leftEmpty = false;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0);
pJoinInfo->leftPos = 0;
qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0);
if (!pJoinInfo->downstreamFetchDone[0]) {
pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0);
pJoinInfo->leftPos = 0;
qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0);
} else {
pJoinInfo->pLeft = NULL;
}
if (pJoinInfo->pLeft == NULL) {
if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) {
leftEmpty = true;
@ -670,10 +677,14 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1);
if (!pJoinInfo->downstreamFetchDone[1]) {
pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1);
pJoinInfo->rightPos = 0;
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
pJoinInfo->rightPos = 0;
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
} else {
pJoinInfo->pRight = NULL;
}
if (pJoinInfo->pRight == NULL) {
setMergeJoinDone(pOperator);
@ -753,15 +764,21 @@ void resetMergeJoinOperator(struct SOperatorInfo* pOperator) {
pJoinInfo->leftPos = 0;
pJoinInfo->pRight = NULL;
pJoinInfo->rightPos = 0;
pJoinInfo->downstreamFetchDone[0] = false;
pJoinInfo->downstreamFetchDone[1] = false;
pJoinInfo->resRows = 0;
pOperator->status = OP_OPENED;
}
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) {
qError("total merge join res rows:%" PRId64, pJoinInfo->resRows);
return NULL;
} else {
resetMergeJoinOperator(pOperator);
qError("start new merge join");
}
}
@ -785,7 +802,15 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break;
}
}
return (pRes->info.rows > 0) ? pRes : NULL;
if (pRes->info.rows > 0) {
pJoinInfo->resRows += pRes->info.rows;
qError("merge join returns res rows:%" PRId64, pRes->info.rows);
return pRes;
} else {
qError("total merge join res rows:%" PRId64, pJoinInfo->resRows);
return NULL;
}
}

View File

@ -805,10 +805,11 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
}
STableKeyInfo info = {.groupId = 0};
int32_t tableIdx = 0;
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t))) {
if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t))) {
if (TSDB_CODE_DUP_KEY == terrno) {
continue;
}
@ -821,6 +822,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tableIdx++;
qError("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo));
}

View File

@ -387,7 +387,7 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask);
int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
int32_t qwOpenRef(void);
@ -400,7 +400,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx);
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);

View File

@ -17,7 +17,7 @@ SQWDebug gQWDebug = {.lockEnable = false,
.sleepSimulate = false,
.forceStop = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask) {
if (!gQWDebug.statusEnable) {
return TSDB_CODE_SUCCESS;
}
@ -25,7 +25,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
int32_t code = 0;
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
if (dynamicTask || newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
*ignore = true;
return TSDB_CODE_SUCCESS;
}

View File

@ -45,7 +45,7 @@ char *qwBufStatusStr(int32_t bufStatus) {
return "UNKNOWN";
}
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status, bool dynamicTask) {
int32_t code = 0;
int8_t origStatus = 0;
bool ignore = false;
@ -53,7 +53,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
while (true) {
origStatus = atomic_load_8(&task->status);
QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore, dynamicTask));
if (ignore) {
break;
}
@ -381,7 +381,7 @@ _return:
QW_RET(code);
}
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
@ -389,7 +389,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status, dynamicTask));
_return:
@ -417,7 +417,7 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC));
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));

View File

@ -345,7 +345,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfRows);
if (!ctx->dynamicTask) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask);
}
if (NULL == rsp) {
@ -391,7 +391,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask);
break;
}
@ -496,6 +496,8 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
dsReset(ctx->sinkHandle);
qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg);
@ -544,7 +546,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(ctx->rspCode);
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask));
break;
}
case QW_PHASE_PRE_FETCH: {
@ -653,7 +655,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask);
ctx->queryGotData = true;
}
@ -679,7 +681,7 @@ _return:
}
if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
@ -926,7 +928,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} else if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
@ -989,7 +991,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask);
} else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropped = true;
@ -1007,7 +1009,7 @@ _return:
QW_UPDATE_RSP_CODE(ctx, code);
}
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
}
if (locked) {