diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index 9aed87d811..84f3b3e85a 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -170,6 +170,12 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
+int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
+ SSubmitTbData* pTableData, const char* id);
+int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
+
+SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
+ SSDataBlock* pDataBlock);
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c
index 0c37008344..f18843bc35 100644
--- a/source/dnode/vnode/src/sma/smaTimeRange.c
+++ b/source/dnode/vnode/src/sma/smaTimeRange.c
@@ -35,8 +35,8 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
return code;
}
-int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
- int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg);
+int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
+ int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
return code;
}
@@ -109,7 +109,7 @@ _exit:
* @param pMsg
* @return int32_t
*/
-static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
+static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
int32_t code = 0;
int32_t lino = 0;
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
@@ -118,7 +118,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
// create tsma meta in dstVgId
- if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
+ if (metaCreateTSma(SMA_META(pSma), ver, pCfg) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
@@ -130,7 +130,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag;
- if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
+ if (metaCreateSTable(SMA_META(pSma), ver, &pReq) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
@@ -154,9 +154,8 @@ _exit:
return code;
}
-int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema,
- SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName,
- SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
+int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, bool createTb, int64_t suid,
+ const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
int32_t code = 0;
int32_t lino = 0;
void *pBuf = NULL;
@@ -166,82 +165,23 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SArray *createTbArray = NULL;
SArray *pVals = NULL;
- int32_t sz = taosArrayGetSize(pBlocks);
+ int32_t numOfBlocks = taosArrayGetSize(pBlocks);
tagArray = taosArrayInit(1, sizeof(STagVal));
- createTbArray = taosArrayInit(sz, POINTER_BYTES);
+ createTbArray = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
- if(!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) {
+ if (!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
- // create table req
- if (createTb) {
- for (int32_t i = 0; i < sz; ++i) {
- SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
- SVCreateTbReq *pCreateTbReq = NULL;
- if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
- taosArrayPush(createTbArray, &pCreateTbReq);
- continue;
- }
-
- if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- TSDB_CHECK_CODE(code, lino, _exit);
- };
-
- // don't move to the end of loop as to destroy in the end of func when error occur
- taosArrayPush(createTbArray, &pCreateTbReq);
-
- // set const
- pCreateTbReq->flags = 0;
- pCreateTbReq->type = TSDB_CHILD_TABLE;
- pCreateTbReq->ctb.suid = suid;
-
- // set super table name
- SName name = {0};
- tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
- pCreateTbReq->ctb.stbName = taosStrdup((char *)tNameGetTableName(&name)); // taosStrdup(stbFullName);
-
- // set tag content
- taosArrayClear(tagArray);
- STagVal tagVal = {
- .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
- .type = TSDB_DATA_TYPE_UBIGINT,
- .i64 = (int64_t)pDataBlock->info.id.groupId,
- };
- taosArrayPush(tagArray, &tagVal);
- pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
-
- STag *pTag = NULL;
- tTagNew(tagArray, 1, false, &pTag);
- if (pTag == NULL) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- pCreateTbReq->ctb.pTag = (uint8_t *)pTag;
-
- // set tag name
- SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
- char tagNameStr[TSDB_COL_NAME_LEN] = {0};
- strcpy(tagNameStr, "group_id");
- taosArrayPush(tagName, tagNameStr);
- pCreateTbReq->ctb.tagName = tagName;
-
- // set table name
- if (pDataBlock->info.parTbName[0]) {
- pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
- } else {
- pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
- }
- }
- }
+ SHashObj *pTableIndexMap =
+ taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// SSubmitTbData req
- for (int32_t i = 0; i < sz; ++i) {
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
@@ -250,57 +190,38 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue;
}
- int32_t rows = pDataBlock->info.rows;
+ SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
+ int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
- SSubmitTbData tbData = {0};
+ tbData.pCreateTbReq =
+ buildAutoCreateTableReq(stbFullName, suid, taosArrayGetSize(pDataBlock->pDataBlock) + 1, pDataBlock);
- if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
- code = terrno;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- tbData.suid = suid;
- tbData.uid = 0; // uid is assigned by vnode
- tbData.sver = pTSchema->version;
+ {
+ uint64_t groupId = pDataBlock->info.id.groupId;
- if (createTb) {
- tbData.pCreateTbReq = taosArrayGetP(createTbArray, i);
- if (tbData.pCreateTbReq) tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
- }
+ int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
+ if (index == NULL) { // no data yet, append it
+ code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
+ if (code != TSDB_CODE_SUCCESS) {
+ continue;
+ }
- if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
- taosArrayDestroy(tbData.aRowP);
- code = terrno;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
+ taosArrayPush(pReq->aSubmitTbData, &tbData);
- for (int32_t j = 0; j < rows; ++j) {
- taosArrayClear(pVals);
- for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
- const STColumn *pCol = &pTSchema->columns[k];
- SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k);
- if (colDataIsNull_s(pColData, j)) {
- SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
- taosArrayPush(pVals, &cv);
- } else {
- void *data = colDataGetData(pColData, j);
- if (IS_STR_DATA_TYPE(pCol->type)) {
- SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
- SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
- taosArrayPush(pVals, &cv);
- } else {
- SValue sv;
- memcpy(&sv.val, data, tDataTypes[pCol->type].bytes);
- SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
- taosArrayPush(pVals, &cv);
- }
+ int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
+ taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
+ } else {
+ code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
+ if (code != TSDB_CODE_SUCCESS) {
+ continue;
+ }
+
+ SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
+ code = doMergeExistedRows(pExisted, &tbData, "id");
+ if (code != TSDB_CODE_SUCCESS) {
+ continue;
}
}
- SRow *pRow = NULL;
- if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
- tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- taosArrayPush(tbData.aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, &tbData);
@@ -442,7 +363,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
void *pSubmitReq = NULL;
int32_t contLen = 0;
- code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
+ code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
TSDB_CHECK_CODE(code, lino, _exit);
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 5dafe6a4a0..65484f4842 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include "tcommon.h"
#include "tmsg.h"
#include "tq.h"
@@ -28,19 +29,19 @@ static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData);
-static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
- SSubmitTbData* pTableData);
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
-static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
+static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
-static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
-static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock);
+static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
+static SArray* createDefaultTagColName();
+static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
+ int64_t gid);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
@@ -138,61 +139,68 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS;
}
+int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
+ pCreateTableReq->flags = 0;
+ pCreateTableReq->type = TSDB_CHILD_TABLE;
+ pCreateTableReq->ctb.suid = suid;
+
+ // set super table name
+ SName name = {0};
+ tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
+ pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
+
+ pCreateTableReq->ctb.tagNum = numOfTags;
+ return TSDB_CODE_SUCCESS;
+}
+
+SArray* createDefaultTagColName() {
+ SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
+ char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
+ taosArrayPush(pTagColNameList, tagNameStr);
+ return pTagColNameList;
+}
+
+void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
+ int64_t gid) {
+ if (pDataBlock->info.parTbName[0]) {
+ pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
+ } else {
+ pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid);
+ }
+}
+
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
- SArray* tagArray = NULL;
+ SArray* tagArray = taosArrayInit(4, sizeof(STagVal));;
int32_t code = 0;
SVCreateTbBatchReq reqs = {0};
-
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) {
+ tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
- // set const
- pCreateTbReq->flags = 0;
- pCreateTbReq->type = TSDB_CHILD_TABLE;
- pCreateTbReq->ctb.suid = suid;
-
- // set super table name
- SName name = {0};
- tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
- pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
-
- // set tag content
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
- if (size == 2) {
- tagArray = taosArrayInit(1, sizeof(STagVal));
- if (!tagArray) {
- tdDestroySVCreateTbReq(pCreateTbReq);
- goto _end;
- }
+ int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
+ initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
+ taosArrayClear(tagArray);
+
+ if (size == 2) {
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
-
- // set tag name
- SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
- char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
- taosArrayPush(tagName, tagNameStr);
- pCreateTbReq->ctb.tagName = tagName;
+ pCreateTbReq->ctb.tagName = createDefaultTagColName();
} else {
- tagArray = taosArrayInit(size - 1, sizeof(STagVal));
- if (!tagArray) {
- tdDestroySVCreateTbReq(pCreateTbReq);
- goto _end;
- }
-
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
@@ -209,29 +217,25 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
taosArrayPush(tagArray, &tagVal);
}
}
- pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
- STag* pTag = NULL;
- tTagNew(tagArray, 1, false, &pTag);
+ tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
tagArray = taosArrayDestroy(tagArray);
- if (pTag == NULL) {
+ if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
- pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
-
- // set table name
- if (!pDataBlock->info.parTbName[0]) {
+ uint64_t gid = pDataBlock->info.id.groupId;
+ if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
- pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
- } else {
- pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
+ ASSERT(gid == *(int64_t*)pGpIdData);
}
+ setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid);
+
taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
}
@@ -390,10 +394,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
return true;
}
-SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
- char* ctbName = pDataBlock->info.parTbName;
-
- SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
+SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
+ SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@@ -408,42 +410,25 @@ SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t
return NULL;
}
- // set const
- pCreateTbReq->flags = 0;
- pCreateTbReq->type = TSDB_CHILD_TABLE;
- pCreateTbReq->ctb.suid = suid;
-
- // set super table name
- SName name = {0};
- tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
- pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
+ initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
- pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
- STag* pTag = NULL;
- tTagNew(tagArray, 1, false, &pTag);
+ tTagNew(tagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag);
taosArrayDestroy(tagArray);
- if (pTag == NULL) {
+ if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
-
- // set tag name
- SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
- char k[TSDB_COL_NAME_LEN] = "group_id";
- taosArrayPush(tagName, k);
-
- pCreateTbReq->ctb.tagName = tagName;
+ pCreateTbReq->ctb.tagName = createDefaultTagColName();;
// set table name
- pCreateTbReq->name = taosStrdup(ctbName);
+ setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId);
return pCreateTbReq;
}
@@ -514,7 +499,7 @@ int32_t tsAscendingSortFn(const void* p1, const void* p2) {
}
}
-int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
+int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS;
@@ -724,17 +709,16 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
return TSDB_CODE_SUCCESS;
}
-int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
- SSubmitTbData* pTableData) {
- int32_t numOfRows = pDataBlock->info.rows;
- const char* id = pTask->id.idStr;
+int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
+ SSubmitTbData* pTableData, const char* id) {
+ int32_t numOfRows = pDataBlock->info.rows;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
- id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid);
+ id, blockIndex + 1, numOfRows, suid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
- int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id);
+ int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, id);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
return code;
@@ -800,7 +784,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
}
- code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
+ code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
@@ -847,7 +831,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
}
- code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
+ code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
@@ -857,7 +841,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
} else {
- code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
+ code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index 307e1c47c9..fcb61dc892 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -62,7 +62,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
int8_t status = atomic_load_8(&pTask->schedInfo.status);
- stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
+ stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s jump out of schedTimer", id);
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index ce4feb38eb..f53f3d50a8 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -1992,14 +1992,12 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
return 0;
}
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
- stDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
- stDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1;
SStateKey tkey;
SStateKey* pKtmp = &tkey;
@@ -2024,7 +2022,6 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
return -1;
}
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
- stDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
@@ -2035,7 +2032,6 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
}
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
- stDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
@@ -2074,7 +2070,6 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
- stDebug("streamStateGetCur_rocksdb");
int32_t code = 0;
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
@@ -2115,7 +2110,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
}
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
- stDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor();
diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c
index 9ebd617be0..965f116572 100644
--- a/source/libs/stream/src/streamStart.c
+++ b/source/libs/stream/src/streamStart.c
@@ -407,8 +407,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
-
- taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer);
+ pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
}
}
diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py
index c4fc1ce654..92577744ac 100644
--- a/tests/pytest/util/dnodes.py
+++ b/tests/pytest/util/dnodes.py
@@ -135,6 +135,8 @@ class TDDnode:
"vDebugFlag": "143",
"tqDebugFlag": "143",
"cDebugFlag": "143",
+ "stDebugFlag": "143",
+ "smaDebugFlag": "143",
"jniDebugFlag": "143",
"qDebugFlag": "143",
"rpcDebugFlag": "143",
diff --git a/tests/system-test/0-others/timeRangeWise.py b/tests/system-test/0-others/timeRangeWise.py
index 5ef5aa4a75..7a258f5dd3 100644
--- a/tests/system-test/0-others/timeRangeWise.py
+++ b/tests/system-test/0-others/timeRangeWise.py
@@ -23,7 +23,8 @@ from util.cases import *
from util.sql import *
class TDTestCase:
-
+ updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
+ 'asynclog': 0, 'stdebugflag':135}
# random string
def random_string(self, count):
letters = string.ascii_letters
diff --git a/tests/system-test/test.py b/tests/system-test/test.py
index 6813530a5c..81f98fea22 100644
--- a/tests/system-test/test.py
+++ b/tests/system-test/test.py
@@ -398,7 +398,7 @@ if __name__ == "__main__":
tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll()
for dnode in tdDnodes.dnodes:
- tdDnodes.deploy(dnode.index,{})
+ tdDnodes.deploy(dnode.index, updateCfgDict)
for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)