fix(sma): fix bugs in sma and do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-24 09:40:38 +08:00
parent a882c639be
commit 7a8b046a35
10 changed files with 118 additions and 213 deletions

View File

@ -170,6 +170,12 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader); void tqUpdateNodeStage(STQ* pTq, bool isLeader);
int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData, const char* id);
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -35,8 +35,8 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
return code; return code;
} }
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) { int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg); int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
return code; return code;
} }
@ -109,7 +109,7 @@ _exit:
* @param pMsg * @param pMsg
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSmaCfg *pCfg = (SSmaCfg *)pMsg; SSmaCfg *pCfg = (SSmaCfg *)pMsg;
@ -118,7 +118,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
// create tsma meta in dstVgId // create tsma meta in dstVgId
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { if (metaCreateTSma(SMA_META(pSma), ver, pCfg) < 0) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -130,7 +130,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
pReq.schemaRow = pCfg->schemaRow; pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag; pReq.schemaTag = pCfg->schemaTag;
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { if (metaCreateSTable(SMA_META(pSma), ver, &pReq) < 0) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -154,9 +154,8 @@ _exit:
return code; return code;
} }
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, bool createTb, int64_t suid,
SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName, const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
void *pBuf = NULL; void *pBuf = NULL;
@ -166,10 +165,10 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SArray *createTbArray = NULL; SArray *createTbArray = NULL;
SArray *pVals = NULL; SArray *pVals = NULL;
int32_t sz = taosArrayGetSize(pBlocks); int32_t numOfBlocks = taosArrayGetSize(pBlocks);
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
createTbArray = taosArrayInit(sz, POINTER_BYTES); createTbArray = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
@ -178,70 +177,11 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// create table req SHashObj *pTableIndexMap =
if (createTb) { taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
SVCreateTbReq *pCreateTbReq = NULL;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
taosArrayPush(createTbArray, &pCreateTbReq);
continue;
}
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
};
// don't move to the end of loop as to destroy in the end of func when error occur
taosArrayPush(createTbArray, &pCreateTbReq);
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char *)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag *pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCreateTbReq->ctb.pTag = (uint8_t *)pTag;
// set tag name
SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
} else {
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
}
}
// SSubmitTbData req // SSubmitTbData req
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
@ -250,58 +190,39 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue; continue;
} }
int32_t rows = pDataBlock->info.rows; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
SSubmitTbData tbData = {0}; tbData.pCreateTbReq =
buildAutoCreateTableReq(stbFullName, suid, taosArrayGetSize(pDataBlock->pDataBlock) + 1, pDataBlock);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { {
code = terrno; uint64_t groupId = pDataBlock->info.id.groupId;
TSDB_CHECK_CODE(code, lino, _exit);
}
tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version;
if (createTb) { int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
tbData.pCreateTbReq = taosArrayGetP(createTbArray, i); if (index == NULL) { // no data yet, append it
if (tbData.pCreateTbReq) tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
if (code != TSDB_CODE_SUCCESS) {
continue;
} }
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { taosArrayPush(pReq->aSubmitTbData, &tbData);
taosArrayDestroy(tbData.aRowP);
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t j = 0; j < rows; ++j) { int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
taosArrayClear(pVals); taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn *pCol = &pTSchema->columns[k];
SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else { } else {
void *data = colDataGetData(pColData, j); code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
if (IS_STR_DATA_TYPE(pCol->type)) { if (code != TSDB_CODE_SUCCESS) {
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value continue;
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); }
taosArrayPush(pVals, &cv);
} else { SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
SValue sv; code = doMergeExistedRows(pExisted, &tbData, "id");
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); if (code != TSDB_CODE_SUCCESS) {
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); continue;
taosArrayPush(pVals, &cv);
} }
} }
} }
SRow *pRow = NULL;
if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosArrayPush(tbData.aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, &tbData); taosArrayPush(pReq->aSubmitTbData, &tbData);
} }
@ -442,7 +363,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
void *pSubmitReq = NULL; void *pSubmitReq = NULL;
int32_t contLen = 0; int32_t contLen = 0;
code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen); pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -1831,7 +1831,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
bool allStopped = false;
SStreamTaskNodeUpdateMsg req = {0}; SStreamTaskNodeUpdateMsg req = {0};

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <common/tmsg.h>
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
@ -28,19 +29,19 @@ static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData); SSubmitTbData* pTableData);
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData);
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid); int64_t suid);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid); const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
@ -138,61 +139,68 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
pCreateTableReq->flags = 0;
pCreateTableReq->type = TSDB_CHILD_TABLE;
pCreateTableReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
pCreateTableReq->ctb.tagNum = numOfTags;
return TSDB_CODE_SUCCESS;
}
SArray* createDefaultTagColName() {
SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(pTagColNameList, tagNameStr);
return pTagColNameList;
}
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid) {
if (pDataBlock->info.parTbName[0]) {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
} else {
pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid);
}
}
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) { int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr); tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
SArray* tagArray = NULL; SArray* tagArray = taosArrayInit(4, sizeof(STagVal));;
int32_t code = 0; int32_t code = 0;
SVCreateTbBatchReq reqs = {0}; SVCreateTbBatchReq reqs = {0};
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) { if (NULL == reqs.pArray) {
tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
goto _end; goto _end;
} }
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
if (size == 2) { int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
taosArrayClear(tagArray);
if (size == 2) {
STagVal tagVal = { STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagName = createDefaultTagColName();
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
} else { } else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
@ -209,29 +217,25 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
} }
} }
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
STag* pTag = NULL; tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; uint64_t gid = pDataBlock->info.id.groupId;
if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
// set table name
if (!pDataBlock->info.parTbName[0]) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); ASSERT(gid == *(int64_t*)pGpIdData);
} else {
pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
} }
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid);
taosArrayPush(reqs.pArray, pCreateTbReq); taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
} }
@ -390,10 +394,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
return true; return true;
} }
SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
char* ctbName = pDataBlock->info.parTbName; SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
if (pCreateTbReq == NULL) { if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -408,42 +410,25 @@ SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t
return NULL; return NULL;
} }
// set const initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL; tTagNew(tagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag);
tTagNew(tagArray, 1, false, &pTag);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq); taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; pCreateTbReq->ctb.tagName = createDefaultTagColName();;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char k[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(tagName, k);
pCreateTbReq->ctb.tagName = tagName;
// set table name // set table name
pCreateTbReq->name = taosStrdup(ctbName); setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId);
return pCreateTbReq; return pCreateTbReq;
} }
@ -514,7 +499,7 @@ int32_t tsAscendingSortFn(const void* p1, const void* p2) {
} }
} }
int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
int32_t numOfRows = pDataBlock->info.rows; int32_t numOfRows = pDataBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -724,17 +709,16 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData) { SSubmitTbData* pTableData, const char* id) {
int32_t numOfRows = pDataBlock->info.rows; int32_t numOfRows = pDataBlock->info.rows;
const char* id = pTask->id.idStr;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid); id, blockIndex + 1, numOfRows, suid);
char* dstTableName = pDataBlock->info.parTbName; char* dstTableName = pDataBlock->info.parTbName;
// convert all rows // convert all rows
int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id); int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno)); tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
return code; return code;
@ -800,7 +784,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue; continue;
} }
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }
@ -847,7 +831,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue; continue;
} }
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }
@ -857,7 +841,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
} else { } else {
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }

View File

@ -62,7 +62,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
int32_t nextTrigger = (int32_t)pTask->info.triggerParam; int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
int8_t status = atomic_load_8(&pTask->schedInfo.status); int8_t status = atomic_load_8(&pTask->schedInfo.status);
stDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s jump out of schedTimer", id); stDebug("s-task:%s jump out of schedTimer", id);

View File

@ -2006,14 +2006,12 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
return 0; return 0;
} }
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
stDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1; if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
return 0; return 0;
} }
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
stDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1; if (!pCur) return -1;
SStateKey tkey; SStateKey tkey;
SStateKey* pKtmp = &tkey; SStateKey* pKtmp = &tkey;
@ -2038,7 +2036,6 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
return -1; return -1;
} }
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
stDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) { if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
@ -2049,7 +2046,6 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
} }
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -2088,7 +2084,6 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
} }
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
stDebug("streamStateGetCur_rocksdb");
int32_t code = 0; int32_t code = 0;
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
@ -2129,7 +2124,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
} }
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();

View File

@ -407,8 +407,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer);
} }
} }

View File

@ -136,6 +136,7 @@ class TDDnode:
"tqDebugFlag": "143", "tqDebugFlag": "143",
"cDebugFlag": "143", "cDebugFlag": "143",
"stDebugFlag": "143", "stDebugFlag": "143",
"smaDebugFlag": "143",
"jniDebugFlag": "143", "jniDebugFlag": "143",
"qDebugFlag": "143", "qDebugFlag": "143",
"rpcDebugFlag": "143", "rpcDebugFlag": "143",

View File

@ -23,7 +23,8 @@ from util.cases import *
from util.sql import * from util.sql import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0, 'stdebugflag':135}
# random string # random string
def random_string(self, count): def random_string(self, count):
letters = string.ascii_letters letters = string.ascii_letters

View File

@ -398,7 +398,7 @@ if __name__ == "__main__":
tdDnodes.setValgrind(valgrind) tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll() tdDnodes.stopAll()
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.deploy(dnode.index,{}) tdDnodes.deploy(dnode.index, updateCfgDict)
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index) tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql) tdCases.logSql(logSql)