From 3bcffcf1da35cce2894b734416f73d8dc37490d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Oct 2023 09:40:38 +0800 Subject: [PATCH] other: merge stream refactor from main. --- source/dnode/vnode/src/inc/tq.h | 6 + source/dnode/vnode/src/sma/smaTimeRange.c | 157 +++++------------- source/dnode/vnode/src/tq/tqSink.c | 150 ++++++++--------- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 6 - source/libs/stream/src/streamStart.c | 3 +- tests/pytest/util/dnodes.py | 2 + tests/system-test/0-others/timeRangeWise.py | 3 +- tests/system-test/test.py | 2 +- 9 files changed, 119 insertions(+), 212 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 9aed87d811..84f3b3e85a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -170,6 +170,12 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); +int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, const char* id); +int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); + +SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, + SSDataBlock* pDataBlock); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 0c37008344..f18843bc35 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -35,8 +35,8 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { return code; } -int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) { - int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg); +int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) { + int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg); return code; } @@ -109,7 +109,7 @@ _exit: * @param pMsg * @return int32_t */ -static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { +static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) { int32_t code = 0; int32_t lino = 0; SSmaCfg *pCfg = (SSmaCfg *)pMsg; @@ -118,7 +118,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char * if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { // create tsma meta in dstVgId - if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { + if (metaCreateTSma(SMA_META(pSma), ver, pCfg) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -130,7 +130,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char * pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; - if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { + if (metaCreateSTable(SMA_META(pSma), ver, &pReq) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -154,9 +154,8 @@ _exit: return code; } -int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, - SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName, - SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) { +int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, bool createTb, int64_t suid, + const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) { int32_t code = 0; int32_t lino = 0; void *pBuf = NULL; @@ -166,82 +165,23 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * SArray *createTbArray = NULL; SArray *pVals = NULL; - int32_t sz = taosArrayGetSize(pBlocks); + int32_t numOfBlocks = taosArrayGetSize(pBlocks); tagArray = taosArrayInit(1, sizeof(STagVal)); - createTbArray = taosArrayInit(sz, POINTER_BYTES); + createTbArray = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); - if(!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) { + if (!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) { code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TSDB_CHECK_CODE(code, lino, _exit); } - // create table req - if (createTb) { - for (int32_t i = 0; i < sz; ++i) { - SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); - SVCreateTbReq *pCreateTbReq = NULL; - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - taosArrayPush(createTbArray, &pCreateTbReq); - continue; - } - - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - }; - - // don't move to the end of loop as to destroy in the end of func when error occur - taosArrayPush(createTbArray, &pCreateTbReq); - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char *)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - - STag *pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pCreateTbReq->ctb.pTag = (uint8_t *)pTag; - - // set tag name - SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - if (pDataBlock->info.parTbName[0]) { - pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); - } else { - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } - } - } + SHashObj *pTableIndexMap = + taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); // SSubmitTbData req - for (int32_t i = 0; i < sz; ++i) { + for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; @@ -250,57 +190,38 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * continue; } - int32_t rows = pDataBlock->info.rows; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; + int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - SSubmitTbData tbData = {0}; + tbData.pCreateTbReq = + buildAutoCreateTableReq(stbFullName, suid, taosArrayGetSize(pDataBlock->pDataBlock) + 1, pDataBlock); - if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - tbData.suid = suid; - tbData.uid = 0; // uid is assigned by vnode - tbData.sver = pTSchema->version; + { + uint64_t groupId = pDataBlock->info.id.groupId; - if (createTb) { - tbData.pCreateTbReq = taosArrayGetP(createTbArray, i); - if (tbData.pCreateTbReq) tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - } + int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); + if (index == NULL) { // no data yet, append it + code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + if (code != TSDB_CODE_SUCCESS) { + continue; + } - if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { - taosArrayDestroy(tbData.aRowP); - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + taosArrayPush(pReq->aSubmitTbData, &tbData); - for (int32_t j = 0; j < rows; ++j) { - taosArrayClear(pVals); - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn *pCol = &pTSchema->columns[k]; - SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k); - if (colDataIsNull_s(pColData, j)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); - } else { - void *data = colDataGetData(pColData, j); - if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } else { - SValue sv; - memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } + int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; + taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); + } else { + code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index); + code = doMergeExistedRows(pExisted, &tbData, "id"); + if (code != TSDB_CODE_SUCCESS) { + continue; } } - SRow *pRow = NULL; - if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - TSDB_CHECK_CODE(code, lino, _exit); - } - taosArrayPush(tbData.aRowP, &pRow); } taosArrayPush(pReq->aSubmitTbData, &tbData); @@ -442,7 +363,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char void *pSubmitReq = NULL; int32_t contLen = 0; - code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, + code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5dafe6a4a0..65484f4842 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "tcommon.h" #include "tmsg.h" #include "tq.h" @@ -28,19 +29,19 @@ static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData); -static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData); static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); -static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); +static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid); static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); -static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); -static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock); +static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); +static SArray* createDefaultTagColName(); +static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, + int64_t gid); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -138,61 +139,68 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } +int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) { + pCreateTableReq->flags = 0; + pCreateTableReq->type = TSDB_CHILD_TABLE; + pCreateTableReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); + + pCreateTableReq->ctb.tagNum = numOfTags; + return TSDB_CODE_SUCCESS; +} + +SArray* createDefaultTagColName() { + SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; + taosArrayPush(pTagColNameList, tagNameStr); + return pTagColNameList; +} + +void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, + int64_t gid) { + if (pDataBlock->info.parTbName[0]) { + pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + } else { + pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); + } +} + static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { tqDebug("s-task:%s build create table msg", pTask->id.idStr); STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t rows = pDataBlock->info.rows; - SArray* tagArray = NULL; + SArray* tagArray = taosArrayInit(4, sizeof(STagVal));; int32_t code = 0; SVCreateTbBatchReq reqs = {0}; - SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); if (NULL == reqs.pArray) { + tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno)); goto _end; } for (int32_t rowId = 0; rowId < rows; rowId++) { SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); - if (size == 2) { - tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } + int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1); + initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags); + taosArrayClear(tagArray); + + if (size == 2) { STagVal tagVal = { .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; taosArrayPush(tagArray, &tagVal); - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; + pCreateTbReq->ctb.tagName = createDefaultTagColName(); } else { - tagArray = taosArrayInit(size - 1, sizeof(STagVal)); - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); @@ -209,29 +217,25 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S taosArrayPush(tagArray, &tagVal); } } - pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); + tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { + if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set table name - if (!pDataBlock->info.parTbName[0]) { + uint64_t gid = pDataBlock->info.id.groupId; + if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); - } else { - pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); + ASSERT(gid == *(int64_t*)pGpIdData); } + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid); + taosArrayPush(reqs.pArray, pCreateTbReq); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); } @@ -390,10 +394,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam return true; } -SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { - char* ctbName = pDataBlock->info.parTbName; - - SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)); +SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { + SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pCreateTbReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -408,42 +410,25 @@ SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t return NULL; } - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); + initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1); STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); + tTagNew(tagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag); taosArrayDestroy(tagArray); - if (pTag == NULL) { + if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); taosMemoryFreeClear(pCreateTbReq); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char k[TSDB_COL_NAME_LEN] = "group_id"; - taosArrayPush(tagName, k); - - pCreateTbReq->ctb.tagName = tagName; + pCreateTbReq->ctb.tagName = createDefaultTagColName();; // set table name - pCreateTbReq->name = taosStrdup(ctbName); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); return pCreateTbReq; } @@ -514,7 +499,7 @@ int32_t tsAscendingSortFn(const void* p1, const void* p2) { } } -int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { +int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { int32_t numOfRows = pDataBlock->info.rows; int32_t code = TSDB_CODE_SUCCESS; @@ -724,17 +709,16 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return TSDB_CODE_SUCCESS; } -int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData) { - int32_t numOfRows = pDataBlock->info.rows; - const char* id = pTask->id.idStr; +int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, const char* id) { + int32_t numOfRows = pDataBlock->info.rows; tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, - id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid); + id, blockIndex + 1, numOfRows, suid); char* dstTableName = pDataBlock->info.parTbName; // convert all rows - int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id); + int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, id); if (code != TSDB_CODE_SUCCESS) { tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno)); return code; @@ -800,7 +784,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -847,7 +831,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -857,7 +841,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 307e1c47c9..fcb61dc892 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -62,7 +62,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { int32_t nextTrigger = (int32_t)pTask->info.triggerParam; int8_t status = atomic_load_8(&pTask->schedInfo.status); - stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); + stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { stDebug("s-task:%s jump out of schedTimer", id); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ce4feb38eb..f53f3d50a8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1992,14 +1992,12 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke return 0; } int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { - stDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; rocksdb_iter_prev(pCur->iter); return 0; } int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - stDebug("streamStateGetKVByCur_rocksdb"); if (!pCur) return -1; SStateKey tkey; SStateKey* pKtmp = &tkey; @@ -2024,7 +2022,6 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons return -1; } SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { - stDebug("streamStateGetAndCheckCur_rocksdb"); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); if (pCur) { int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); @@ -2035,7 +2032,6 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey } SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - stDebug("streamStateSeekKeyNext_rocksdb"); SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; @@ -2074,7 +2070,6 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin } SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { - stDebug("streamStateGetCur_rocksdb"); int32_t code = 0; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; @@ -2115,7 +2110,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { } SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - stDebug("streamStateGetCur_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SStreamStateCur* pCur = createStreamStateCursor(); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9ebd617be0..965f116572 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -407,8 +407,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); - - taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer); + pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); } } diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index c4fc1ce654..92577744ac 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -135,6 +135,8 @@ class TDDnode: "vDebugFlag": "143", "tqDebugFlag": "143", "cDebugFlag": "143", + "stDebugFlag": "143", + "smaDebugFlag": "143", "jniDebugFlag": "143", "qDebugFlag": "143", "rpcDebugFlag": "143", diff --git a/tests/system-test/0-others/timeRangeWise.py b/tests/system-test/0-others/timeRangeWise.py index 5ef5aa4a75..7a258f5dd3 100644 --- a/tests/system-test/0-others/timeRangeWise.py +++ b/tests/system-test/0-others/timeRangeWise.py @@ -23,7 +23,8 @@ from util.cases import * from util.sql import * class TDTestCase: - + updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, + 'asynclog': 0, 'stdebugflag':135} # random string def random_string(self, count): letters = string.ascii_letters diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 6813530a5c..81f98fea22 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -398,7 +398,7 @@ if __name__ == "__main__": tdDnodes.setValgrind(valgrind) tdDnodes.stopAll() for dnode in tdDnodes.dnodes: - tdDnodes.deploy(dnode.index,{}) + tdDnodes.deploy(dnode.index, updateCfgDict) for dnode in tdDnodes.dnodes: tdDnodes.starttaosd(dnode.index) tdCases.logSql(logSql)