refactor(tmq): add debug log

This commit is contained in:
Liu Jicong 2022-07-21 21:46:55 +08:00
parent b49c61cfb4
commit c82bda25e2
4 changed files with 63 additions and 37 deletions

View File

@ -108,6 +108,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
}
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset);
continue;

View File

@ -120,7 +120,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) {
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper) {
if (msg == NULL) {
// TODO create raw scan
return NULL;
@ -252,6 +253,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
uint64_t* uid = taosArrayGet(qa, i);
qDebug("table %ld added to task info", *uid);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
}

View File

@ -341,10 +341,21 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return -1;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
#ifndef NDEBUG
int64_t previousUid =
((STableKeyInfo*)taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pTableScanInfo->currentTable))->uid;
qDebug("switch to next table %ld (cursor %d), previous table %ld, %ld rows returned", uid,
pTableScanInfo->currentTable, previousUid, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);

View File

@ -376,9 +376,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) {
colDataDestroy(pColData);
}
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
@ -3724,7 +3722,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
static void freeItem(void* pItem) {
void** p = pItem;
if (*p != NULL) {
@ -4051,8 +4048,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval,
fillType, pColInfo, pInfo->primaryTsCol, id);
pInfo->pFillInfo =
taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id);
pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
@ -4066,7 +4063,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -4393,15 +4391,17 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) {
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* pUser) {
int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) {
pTaskInfo->code = code;
return NULL;
@ -4420,8 +4420,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) {
pTaskInfo->code = code;
return NULL;
@ -4433,8 +4434,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
SOperatorInfo* pOperator =
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
@ -4445,13 +4445,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
if (pHandle->vnode) {
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) {
pTaskInfo->code = code;
return NULL;
}
#ifndef NDEBUG
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
qDebug("creating stream task: add table %ld", pKeyInfo->uid);
}
}
#endif
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
@ -4499,7 +4508,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
@ -4961,7 +4971,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;