Merge branch 'enh/rocksdbSstate' into enh/rocksdbSstateMerge

This commit is contained in:
yihaoDeng 2023-05-09 08:58:51 +00:00
commit 55014aa003
25 changed files with 275 additions and 156 deletions

View File

@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetOpen(qTaskInfo_t tinfo);

View File

@ -278,6 +278,7 @@ typedef struct SCheckpointInfo {
typedef struct SStreamStatus { typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus;
} SStreamStatus; } SStreamStatus;
struct SStreamTask { struct SStreamTask {
@ -542,6 +543,7 @@ int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);

View File

@ -138,6 +138,8 @@ typedef struct {
int8_t enableRef; int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct SWalReader SWalReader;
// todo hide this struct // todo hide this struct
typedef struct SWalReader { typedef struct SWalReader {
SWal *pWal; SWal *pWal;
@ -193,9 +195,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead); void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader); void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead); int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader); int64_t walReaderGetCurrentVer(const SWalReader* pReader);
int64_t walReaderGetValidFirstVer(const SWalReader* pReader);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);

View File

@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER); SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER);
SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER) SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER)
useDb = taosHashIterate(pUser->writeTbs, useDb); useDb = taosHashIterate(pUser->useDbs, useDb);
} }
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)

View File

@ -167,9 +167,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t tqOffsetCommitFile(STqOffsetStore* pStore); int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
SBatchDeleteReq* deleteReq); const char* pIdStr);
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data); void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset // tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer); char* tqOffsetBuildFName(const char* path, int32_t fVer);

View File

@ -250,7 +250,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, "");
continue; continue;
} }

View File

@ -169,7 +169,7 @@ void tqNotifyClose(STQ* pTq) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
@ -637,7 +637,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->smaSink.smaSink = smaHandleRes; pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) { } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
int32_t ver1 = 1; int32_t ver1 = 1;
SMetaInfo info = {0}; SMetaInfo info = {0};
@ -821,17 +821,19 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
} }
// do recovery step 1 // do recovery step 1
tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr); tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
streamSourceRecoverScanStep1(pTask); streamSourceRecoverScanStep1(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el); tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
// build msg to launch next step // build msg to launch next step
SStreamRecoverStep2Req req; SStreamRecoverStep2Req req;
@ -842,7 +844,6 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0; return 0;
} }
@ -852,13 +853,14 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
void* serializedReq = rpcMallocCont(len); void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) { if (serializedReq == NULL) {
tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
return -1; return -1;
} }
memcpy(serializedReq, &req, len); memcpy(serializedReq, &req, len);
// dispatch msg // dispatch msg
tqDebug("s-task:%s start recover block stage", pTask->id.idStr); tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
@ -870,12 +872,16 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
int32_t code = 0; int32_t code = 0;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do recovery step 2 // do recovery step 2
int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);
code = streamSourceRecoverScanStep2(pTask, sversion); code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@ -895,12 +901,16 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
// set status normal // set status normal
tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
code = streamSetStatusNormal(pTask); code = streamSetStatusNormal(pTask);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
double el = (taosGetTimestampMs() - st)/ 1000.0;
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task // dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishReq(pTask); code = streamDispatchRecoverFinishReq(pTask);
if (code < 0) { if (code < 0) {
@ -912,7 +922,6 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaSaveTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
@ -1178,6 +1187,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} }
@ -1188,7 +1198,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version // no lock needs to secure the access of the version
if (pReq->igUntreated) { // discard all the data when the stream task is suspended. if (pReq->igUntreated) { // discard all the data when the stream task is suspended.
@ -1325,12 +1335,12 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr()); tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = WAL_READ_TASKS_ID; pRunReq->taskId = WAL_READ_TASKS_ID;

View File

@ -295,7 +295,7 @@ void tqCloseReader(STqReader* pReader) {
} }
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
return -1; return -1;
} }
tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id); tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);

View File

@ -105,20 +105,20 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t status = pTask->status.taskStatus; int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) { if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if (tInputQueueIsFull(pTask)) { if (tInputQueueIsFull(pTask)) {
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
@ -126,17 +126,37 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false; *pScanIdle = false;
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit if (pTask->chkInfo.currentVer < firstVer) {
streamMetaReleaseTask(pStreamMeta, pTask); pTask->chkInfo.currentVer = firstVer;
continue; tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
// todo need retry if failed
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
} else {
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (currentVer != pTask->chkInfo.currentVer) {
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
}
} }
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
SPackedData packData = {0}; SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
if (code != TSDB_CODE_SUCCESS) { // failed, continue if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;

View File

@ -17,23 +17,24 @@
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
SBatchDeleteReq* deleteReq) { const char* pIdStr) {
int32_t totRow = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
tqDebug("stream delete msg: row %d", totRow); tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totalRows; row++) {
int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row); int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name; char* name;
void* varTbName = NULL; void* varTbName = NULL;
if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) { if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
varTbName = colDataGetVarData(pTbNameCol, row); varTbName = colDataGetVarData(pTbNameCol, row);
} }
@ -43,31 +44,17 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
} else { } else {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
} }
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
pVnode->config.vgId, groupId, name, startTs, endTs);
#if 0
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, name) < 0) {
metaReaderClear(&mr);
tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
taosMemoryFree(name);
continue;
}
int64_t uid = mr.me.uid; tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64,
metaReaderClear(&mr); pIdStr, groupId, name, skey, ekey);
taosMemoryFree(name);
#endif SSingleDeleteReq req = { .startTs = skey, .endTs = ekey};
SSingleDeleteReq req = {
.startTs = startTs,
.endTs = endTs,
};
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
taosMemoryFree(name); taosMemoryFree(name);
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
taosArrayPush(deleteReq->deleteReqs, &req); taosArrayPush(deleteReq->deleteReqs, &req);
} }
return 0; return 0;
} }
@ -108,12 +95,7 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
int32_t tlen = 0; int32_t tlen = 0;
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = { SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen };
.msgType = TDMT_VND_CREATE_TABLE,
.pCont = buf,
.contLen = tlen,
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqError("failed to put into write-queue since %s", terrstr()); tqError("failed to put into write-queue since %s", terrstr());
} }
@ -121,13 +103,12 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid; int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName; char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema; STSchema* pTSchema = pTask->tbSink.pTSchema;
/*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/
int32_t blockSz = taosArrayGetSize(pBlocks); int32_t blockSz = taosArrayGetSize(pBlocks);
@ -141,11 +122,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t i = 0; i < blockSz; i++) { for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0}; SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
deleteReq.suid = suid; tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs); taosArrayDestroy(deleteReq.deleteReqs);
continue; continue;
@ -154,10 +135,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t len; int32_t len;
int32_t code; int32_t code;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
// qError("s-task:%s failed to encode delete request", pTask->id.idStr);
ASSERT(0);
} }
SEncoder encoder; SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
@ -168,11 +149,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
SRpcMsg msg = { SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) };
.msgType = TDMT_VND_BATCH_DEL,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
@ -182,6 +159,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (NULL == reqs.pArray) { if (NULL == reqs.pArray) {
goto _end; goto _end;
} }
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq; SVCreateTbReq* pCreateTbReq = &createTbReq;
@ -204,11 +182,13 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
STagVal tagVal = { STagVal tagVal = {
.cid = pTSchema->numOfCols + 1, .cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT, .type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId, .i64 = (int64_t)pDataBlock->info.id.groupId,
}; };
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
// set tag name // set tag name
@ -271,7 +251,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
crTblArray = NULL; crTblArray = NULL;
} else { } else {
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end; goto _end;
@ -405,8 +385,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} else { } else {
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) { if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv = // address copy, no value
(SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {

View File

@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) { if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1; int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {

View File

@ -3164,6 +3164,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pReader->code != TSDB_CODE_SUCCESS) {
return pReader->code;
}
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
return terrno; return terrno;
@ -5078,6 +5082,11 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
if (pReader->code != TSDB_CODE_SUCCESS) {
return NULL;
}
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
return NULL; return NULL;

View File

@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
} }
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1084,12 +1092,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pScanBaseInfo->dataReader); tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL; pScanBaseInfo->dataReader = NULL;
// let's seek to the next version in wal file verifyOffset(pInfo->tqReader->pWalReader, pOffset);
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1; return -1;

View File

@ -2251,8 +2251,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete,
pEnryInfo->isNullRes, pEnryInfo->numOfRes);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) { if (TAOS_FAILED(code1)) {
@ -2274,6 +2273,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
} }
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -16,7 +16,8 @@
#include "streamInc.h" #include "streamInc.h"
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 102400 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100)
int32_t streamInit() { int32_t streamInit() {
int8_t old; int8_t old;
@ -52,7 +53,7 @@ void streamCleanUp() {
void streamSchedByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask); streamMetaReleaseTask(NULL, pTask);
return; return;
} }
@ -216,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
pReq->upstreamNodeId); pReq->upstreamNodeId);
// todo add the input queue buffer limitation // todo add the input queue buffer limitation
@ -253,6 +254,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
ASSERT(0); ASSERT(0);
return 0; return 0;
} }
// continue dispatch // continue dispatch
streamDispatch(pTask); streamDispatch(pTask);
return 0; return 0;
@ -294,13 +296,18 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1; return -1;
} }
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, total);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, numOfBlocks, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
numOfBlocks, size);
streamDataSubmitDestroy(pSubmitBlock); streamDataSubmitDestroy(pSubmitBlock);
return -1; return -1;
} }
@ -308,13 +315,18 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
size);
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks);
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);

View File

@ -68,7 +68,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
} }
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen);
if (pDataSubmit == NULL) { if (pDataSubmit == NULL) {
return NULL; return NULL;
} }
@ -128,7 +128,12 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit)
} }
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen;
}
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }

View File

@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
buf = rpcMallocCont(sizeof(SMsgHead) + tlen); buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -270,8 +271,12 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
goto FAIL; if (buf) {
rpcFreeCont(buf);
}
return code;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead); msg.contLen = tlen + sizeof(SMsgHead);
@ -280,13 +285,10 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
msg.info.noResp = 1; msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr,
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId); pReq->taskId, vgId);
return 0; return 0;
FAIL:
if (buf) rpcFreeCont(buf);
return code;
} }
int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
@ -407,13 +409,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId; req.taskId = downstreamTaskId;
qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
pTask->selfChildId, blockNum, downstreamTaskId, vgId); pTask->selfChildId, blockNum, downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH; goto FAIL_FIXED_DISPATCH;
} }
code = 0; code = 0;
FAIL_FIXED_DISPATCH: FAIL_FIXED_DISPATCH:
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
@ -427,6 +431,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t vgSz = taosArrayGetSize(vgInfo); int32_t vgSz = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
if (pReqs == NULL) { if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -442,6 +447,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
pReqs[i].taskId = pVgInfo->taskId; pReqs[i].taskId = pVgInfo->taskId;
} }
@ -468,43 +474,52 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
} }
} }
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
blockNum, vgSz);
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) { if (pReqs[i].blockNum > 0) {
// send
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
} }
} }
code = 0; code = 0;
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
if (pReqs) { for (int32_t i = 0; i < vgSz; i++) {
for (int32_t i = 0; i < vgSz; i++) { taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen);
taosArrayDestroy(pReqs[i].dataLen);
}
taosMemoryFree(pReqs);
} }
return code; taosMemoryFree(pReqs);
} }
return 0; return code;
} }
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr,
taosQueueItemSize(pTask->outputQueue->queue)); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
numOfElems);
}
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr);
return 0; return 0;
} }
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pBlock == NULL) {
qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr); qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0; return 0;
} }
@ -516,10 +531,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
code = -1; code = -1;
streamQueueProcessFail(pTask->outputQueue); streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE;
} }
FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
return code; return code;

View File

@ -15,13 +15,19 @@
#include "streamInc.h" #include "streamInc.h"
#define STREAM_EXEC_MAX_BATCH_NUM 20480 #define MAX_STREAM_EXEC_BATCH_NUM 10240
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
} }
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
@ -141,7 +147,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return 0; return 0;
} }
@ -167,8 +173,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
batchCnt++; batchCnt++;
qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz); qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
if (batchCnt >= batchSz) { if (batchCnt >= batchSz) {
break; break;
} }
@ -201,7 +206,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
} }
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask); streamDispatch(pTask);
} }
@ -255,6 +260,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t batchSize = 1; int32_t batchSize = 1;
void* pInput = NULL; void* pInput = NULL;
int16_t times = 0;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
@ -262,6 +268,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(1);
qDebug("===stream===try agian batchSize:%d", batchSize);
continue;
}
break; break;
} }
@ -280,7 +293,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
batchSize++; batchSize++;
pInput = newRet; pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
break; break;
} }
} }

View File

@ -54,7 +54,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) { if (pMeta->pTasks == NULL) {
goto _err; goto _err;
} }
@ -251,11 +251,13 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosWLockLatch(&pMeta->lock); // taosWLockLatch(&pMeta->lock);
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
@ -273,8 +275,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
} }
taosWUnLockLatch(&pMeta->lock);
} }
int32_t streamMetaBegin(SStreamMeta* pMeta) { int32_t streamMetaBegin(SStreamMeta* pMeta) {

View File

@ -225,7 +225,7 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); qDebug("s-task:%s recover step2 (blocking stage) started", pTask->id.idStr);
if (qStreamSourceRecoverStep2(exec, ver) < 0) { if (qStreamSourceRecoverStep2(exec, ver) < 0) {
} }
@ -233,12 +233,13 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
} }
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId };
.streamId = pTask->id.streamId,
.childId = pTask->selfChildId,
};
// serialize // serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr,
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
req.taskId = pTask->fixedEpDispatcher.taskId; req.taskId = pTask->fixedEpDispatcher.taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {

View File

@ -25,7 +25,7 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
#define MAX_TABLE_NAME_NUM 100000 #define MAX_TABLE_NAME_NUM 2000000
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) { if (pWin1->groupId > pWin2->groupId) {

View File

@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
} }
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
@ -183,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (pReader->curFileFirstVer != pRet->firstVer) { if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner // error code was set inner
if (walReadChangeFile(pReader, pRet->firstVer) < 0) { if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
@ -202,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal; SWal *pWal = pReader->pWal;
if (ver == pReader->curVersion) { if (ver == pReader->curVersion) {
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
@ -232,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curVersion != fetchVer) { if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReaderSeekVer(pRead, fetchVer) < 0) {
return -1; return -1;
} }
seeked = true; seeked = true;
@ -336,7 +338,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} }
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReaderSeekVer(pRead, ver);
if (code < 0) { if (code < 0) {
// pRead->curVersion = ver; // pRead->curVersion = ver;
// pRead->curInvalid = 1; // pRead->curInvalid = 1;
@ -471,7 +473,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
taosThreadMutexLock(&pReader->mutex); taosThreadMutexLock(&pReader->mutex);
if (pReader->curVersion != ver) { if (pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) { if (walReaderSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;

View File

@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
bool empty = false; bool empty = false;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
empty = true; empty = true;
} }
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
pNode->next = NULL; pNode->next = NULL;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->memLimit, tstrerror(code)); queue->memLimit, tstrerror(code));
@ -185,7 +185,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
queue->tail = pNode; queue->tail = pNode;
} }
queue->numOfItems++; queue->numOfItems++;
queue->memOfItems += pNode->size; queue->memOfItems += (pNode->size + pNode->dataSize);
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; queue->numOfItems--;
queue->memOfItems -= pNode->size; queue->memOfItems -= (pNode->size + pNode->dataSize);
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
// queue->numOfItems--; // queue->numOfItems--;
queue->memOfItems -= pNode->size; queue->memOfItems -= (pNode->size + pNode->dataSize);
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,

View File

@ -58,16 +58,16 @@ if $data23 != 0 then
return -1 return -1
endi endi
print ========== stop dnode2 #print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL #system sh/exec.sh -n dnode2 -s stop -x SIGKILL
sleep 1000 #sleep 1000
print =============== drop database #print =============== drop database
sql_error drop database d1 sql drop database d1
print ========== start dnode2 #print ========== start dnode2
system sh/exec.sh -n dnode2 -s start #system sh/exec.sh -n dnode2 -s start
sleep 1000 #sleep 1000
print =============== re-create database print =============== re-create database
$x = 0 $x = 0

View File

@ -190,6 +190,46 @@ if $data11 != 1 then
goto loop2 goto loop2
endi endi
sql pause stream streams2;
sql insert into t1 values(1648791223003,2,2,3,1.1);
sql insert into t1 values(1648791233003,2,2,3,1.1);
sql resume stream IGNORE UNTREATED streams2;
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 4 select * from streamt2;
sql select * from streamt2;
if $rows != 2 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
endi
if $data01 != 1 then
print =====data01=$data01
goto loop3
endi
if $data11 != 1 then
print =====data01=$data01
goto loop3
endi
print ===== step 2 over print ===== step 2 over