fix(stream): disable merge submit blocks.

This commit is contained in:
Haojun Liao 2023-09-08 12:46:15 +08:00
parent 55ac835294
commit b317c8ebff
7 changed files with 300 additions and 186 deletions

View File

@ -700,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);

View File

@ -1126,7 +1126,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask);
}

View File

@ -26,7 +26,7 @@ typedef struct STableSinkInfo {
static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static void fillBucket(STokenBucket* pBucket);
@ -136,6 +136,149 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS;
}
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
SArray* tagArray = NULL;
int32_t code = 0;
SVCreateTbBatchReq reqs = {0};
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) {
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
} else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
continue;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
tagVal.nData = varDataLen(pData);
tagVal.pData = (uint8_t*)varDataVal(pData);
} else {
memcpy(&tagVal.i64, pData, pTagData->info.bytes);
}
taosArrayPush(tagArray, &tagVal);
}
}
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
if (!pDataBlock->info.parTbName[0]) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else {
pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
}
taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
code = tqPutReqToQueue(pVnode, &reqs);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
return code;
}
static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) {
const char* id = pTask->id.idStr;
int32_t vgId = TD_VID(pVnode);
int32_t len = 0;
void* pBuf = NULL;
int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
return code;
}
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
pTask->sinkRecorder.numOfSubmit += 1;
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
}
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@ -151,180 +294,141 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
pTask->tsInfo.sinkStart = taosGetTimestampMs();
}
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, id, numOfBlocks);
bool hasSubmit = false;
SArray* tagArray = NULL;
SArray* pVals = NULL;
SArray* crTblArray = NULL;
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
SVCreateTbBatchReq reqs = {0};
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) {
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal);
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
} else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
continue;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
tagVal.nData = varDataLen(pData);
tagVal.pData = (uint8_t*) varDataVal(pData);
} else {
memcpy(&tagVal.i64, pData, pTagData->info.bytes);
}
taosArrayPush(tagArray, &tagVal);
}
}
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
if (!pDataBlock->info.parTbName[0]) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else {
pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
}
taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
goto _end;
}
tagArray = taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
crTblArray = NULL;
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else {
hasSubmit = true;
pTask->sinkRecorder.numOfBlocks += 1;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
bool isMixBlocks = true;
for(int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
isMixBlocks = true;
break;
}
}
if (hasSubmit) {
int32_t len = 0;
void* pBuf = NULL;
code = tqBuildSubmitReq(&submitReq, vgId, &pBuf, &len);
if (isMixBlocks) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
goto _end;
}
for(int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else {
pTask->sinkRecorder.numOfBlocks += 1;
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks);
} else {
ASSERT(0);
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
return;
}
pTask->sinkRecorder.numOfSubmit += 1;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit,
(taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0);
code = doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq);
}
}
} else {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
taosHashCleanup(pTableIndexMap);
return;
}
bool hasSubmit = false;
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else {
hasSubmit = true;
pTask->sinkRecorder.numOfBlocks += 1;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid));
if (index == NULL) { // no data yet, append it
taosArrayPush(submitReq.aSubmitTbData, &tbData);
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size));
} else {
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
// merge the new submit table block with the existed blocks
// if ts in the new data block overlap with existed one, replace it
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
int32_t newLen = taosArrayGetSize(tbData.aRowP);
int32_t j = 0, k = 0;
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
while (j < newLen && k < oldLen) {
SRow* pNewRow = taosArrayGetP(tbData.aRowP, j);
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
if (pNewRow->ts <= pOldRow->ts) {
taosArrayPush(pFinal, &pNewRow);
if (pNewRow->ts < pOldRow->ts) {
j += 1;
} else {
j += 1;
k += 1;
}
} else {
taosArrayPush(pFinal, &pOldRow);
k += 1;
}
}
while (j < newLen) {
SRow* pRow = taosArrayGetP(tbData.aRowP, j++);
taosArrayPush(pFinal, &pRow);
}
while (k < oldLen) {
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
taosArrayPush(pFinal, &pRow);
}
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pExisted->aRowP);
pExisted->aRowP = pFinal;
tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL));
}
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
}
}
taosHashCleanup(pTableIndexMap);
if (hasSubmit) {
doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq);
} else {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
}
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
// TODO: change
}
int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
@ -490,8 +594,19 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int
return TSDB_CODE_SUCCESS;
}
int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
SStreamTask* pTask, SSubmitTbData* pTableData) {
static int32_t tsAscendingSortFn(const void* p1, const void* p2) {
SRow* pRow1 = *(SRow**) p1;
SRow* pRow2 = *(SRow**) p2;
if (pRow1->ts == pRow2->ts) {
return 0;
} else {
return pRow1->ts > pRow2->ts? 1:-1;
}
}
int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t vgId = TD_VID(pVnode);
uint64_t groupId = pDataBlock->info.id.groupId;
@ -507,7 +622,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (pTableData->aRowP == NULL || pVals == NULL) {
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY;
@ -557,7 +672,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
@ -573,7 +688,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
if (!isValid) { // not valid table, ignore it
metaReaderClear(&mr);
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
@ -612,7 +727,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return terrno;
@ -624,7 +739,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
if (!isValid) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
@ -681,7 +796,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return code;
}
@ -690,6 +805,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
taosArrayPush(pTableData->aRowP, &pRow);
}
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
taosArrayDestroy(pVals);

View File

@ -1443,6 +1443,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
TSKEY *aKey = (TSKEY *)(pColData->pData);
vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), pColData->nVal, pSubmitTbData->uid);
for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
@ -1459,7 +1460,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
code = TSDB_CODE_INVALID_MSG;
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
goto _exit;
}
}
@ -1564,6 +1565,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
} else { // create table failed
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
code = terrno;
vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name,
tstrerror(terrno));
goto _exit;
}
terrno = 0;

View File

@ -2257,7 +2257,7 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
continue;

View File

@ -30,6 +30,13 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1;
@ -97,11 +104,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
streamSetParamForScanHistory(pTask);
streamTaskEnablePause(pTask);
}
streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
streamTaskScanHistoryPrepare(pTask);
}
return 0;
}
@ -402,15 +406,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// agg
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus));
return 0;
}
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {

View File

@ -6,7 +6,8 @@ from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)