fix(stream): fix race condition in send msg. (#30277)
Co-authored-by: 54liuyao <54liuyao@163.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
This commit is contained in:
parent
b46d79a36c
commit
db57fc4ba8
|
@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
||||||
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
||||||
|
int64_t lastMsgId;
|
||||||
} SStreamUpstreamEpInfo;
|
} SStreamUpstreamEpInfo;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
||||||
|
|
|
@ -2547,6 +2547,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
|
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
|
||||||
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
||||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
||||||
|
goto _exit;
|
||||||
if (len >= size - 1) {
|
if (len >= size - 1) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -551,7 +551,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
||||||
pStreamCtrlPool->name = "vnode-stream-ctrl";
|
pStreamCtrlPool->name = "vnode-stream-ctrl";
|
||||||
pStreamCtrlPool->max = 1;
|
pStreamCtrlPool->max = 4;
|
||||||
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||||
|
|
||||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
|
|
|
@ -864,7 +864,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t timeout = 300; // 5min
|
int32_t timeout = 60; // 1min
|
||||||
int64_t start = taosGetTimestampSec();
|
int64_t start = taosGetTimestampSec();
|
||||||
|
|
||||||
while (pTableSinkInfo->uid == 0) {
|
while (pTableSinkInfo->uid == 0) {
|
||||||
|
@ -987,6 +987,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
if (code) {
|
if (code) {
|
||||||
tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
|
tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
|
||||||
return code;
|
return code;
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
|
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
|
||||||
|
|
|
@ -2942,11 +2942,11 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
|
||||||
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
|
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
if (qDebugFlag & DEBUG_INFO) {
|
||||||
char* pBuf = NULL;
|
char* pBuf = NULL;
|
||||||
int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
|
int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
qDebug("%s", pBuf);
|
qInfo("%s", pBuf);
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2962,13 +2962,13 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
|
||||||
pBlock->info.version);
|
pBlock->info.version);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
if (qDebugFlag & DEBUG_INFO) {
|
||||||
char* pBuf = NULL;
|
char* pBuf = NULL;
|
||||||
char flagBuf[64];
|
char flagBuf[64];
|
||||||
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
|
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
|
||||||
int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr);
|
int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
qDebug("%s", pBuf);
|
qInfo("%s", pBuf);
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -949,7 +949,7 @@ void streamBackendCleanup(void* arg) {
|
||||||
streamMutexDestroy(&pHandle->mutex);
|
streamMutexDestroy(&pHandle->mutex);
|
||||||
|
|
||||||
streamMutexDestroy(&pHandle->cfMutex);
|
streamMutexDestroy(&pHandle->cfMutex);
|
||||||
stDebug("vgId:%d destroy stream backend:%p", (int32_t) pHandle->vgId, pHandle);
|
stDebug("vgId:%d destroy stream backend:%p", (int32_t)pHandle->vgId, pHandle);
|
||||||
taosMemoryFree(pHandle);
|
taosMemoryFree(pHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3101,37 +3101,37 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
|
return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
||||||
do { \
|
do { \
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
stWarn("streamState failed to get cf name: %s", funcname); \
|
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); \
|
TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \
|
TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||||
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
|
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
|
||||||
rocksdb_t* db = wrapper->db; \
|
rocksdb_t* db = wrapper->db; \
|
||||||
char* ttlV = NULL; \
|
char* ttlV = NULL; \
|
||||||
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
||||||
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
} else { \
|
} else { \
|
||||||
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
stInfo("[InternalERR] write streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, \
|
||||||
ttlVLen, wrapper); \
|
funcname, vLen, ttlVLen, wrapper); \
|
||||||
} \
|
} \
|
||||||
taosMemoryFree(ttlV); \
|
taosMemoryFree(ttlV); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||||
|
@ -4200,22 +4200,16 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
||||||
int32_t res = 0;
|
int32_t res = 0;
|
||||||
SSessionKey tmpKey = *key;
|
SSessionKey tmpKey = *key;
|
||||||
int32_t valSize = *pVLen;
|
int32_t valSize = *pVLen;
|
||||||
void* tmp = taosMemoryMalloc(valSize);
|
|
||||||
if (!tmp) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
|
||||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
if (fn(pKeyData, stateKey) == true) {
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4230,7 +4224,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
if (fn(pKeyData, stateKey) == true) {
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4238,11 +4231,11 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
||||||
|
|
||||||
*key = tmpKey;
|
*key = tmpKey;
|
||||||
res = 1;
|
res = 1;
|
||||||
memset(tmp, 0, valSize);
|
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosMemoryFreeClear(*pVal);
|
if (res == 0 && valSize > *pVLen){
|
||||||
*pVal = tmp;
|
stError("[InternalERR] [skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],valSize:%d bigger than get rocksdb len:%d", key->win.skey, key->win.ekey, key->groupId, valSize, *pVLen);
|
||||||
|
}
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -4358,7 +4351,7 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
|
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
|
STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4515,6 +4508,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
|
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
char toString[128] = {0};
|
||||||
|
|
||||||
char* dst = NULL;
|
char* dst = NULL;
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
|
@ -4528,6 +4522,10 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
|
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
|
||||||
|
|
||||||
|
ginitDict[cfIdx].toStrFunc((void*)key, toString);
|
||||||
|
qInfo("[InternalERR] write cfIdx:%d key:%s vlen:%d", cfIdx, toString, vlen);
|
||||||
|
|
||||||
char* ttlV = tmpBuf;
|
char* ttlV = tmpBuf;
|
||||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
|
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
||||||
|
|
||||||
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
|
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
|
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
|
||||||
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
|
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
|
@ -243,12 +244,13 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
|
||||||
|
|
||||||
void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
||||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||||
|
|
||||||
|
streamMutexLock(&pMsgInfo->lock);
|
||||||
|
|
||||||
if (pMsgInfo->pData != NULL) {
|
if (pMsgInfo->pData != NULL) {
|
||||||
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
|
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexLock(&pMsgInfo->lock);
|
|
||||||
|
|
||||||
pMsgInfo->checkpointId = -1;
|
pMsgInfo->checkpointId = -1;
|
||||||
pMsgInfo->transId = -1;
|
pMsgInfo->transId = -1;
|
||||||
pMsgInfo->pData = NULL;
|
pMsgInfo->pData = NULL;
|
||||||
|
@ -527,6 +529,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) {
|
||||||
streamTaskFreeRefId(param);
|
streamTaskFreeRefId(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) {
|
||||||
|
int32_t code = 0;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||||
|
|
||||||
|
streamMutexLock(&pMsgInfo->lock);
|
||||||
|
|
||||||
|
int32_t msgId = pMsgInfo->msgId;
|
||||||
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||||
|
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId,
|
||||||
|
msgId);
|
||||||
|
|
||||||
|
int32_t numOfRetry = 0;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
||||||
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// downstream not rsp yet beyond threshold that is 10s
|
||||||
|
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
||||||
|
numOfRetry += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// downstream inputQ is closed
|
||||||
|
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
||||||
|
numOfRetry += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle other errors
|
||||||
|
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
||||||
|
numOfRetry += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry,
|
||||||
|
msgId);
|
||||||
|
} else {
|
||||||
|
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
|
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||||
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
|
|
||||||
|
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
||||||
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
||||||
|
if (pEntry != NULL) {
|
||||||
|
setResendInfo(pEntry, now);
|
||||||
|
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||||
|
|
||||||
|
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||||
|
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
stError("s-task:%s invalid index 0, size:%d", id, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMutexUnlock(&pMsgInfo->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void doMonitorDispatchData(void* param, void* tmrId) {
|
static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
@ -590,65 +662,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
code = sendFailedDispatchData(pTask, now);
|
||||||
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
|
|
||||||
pTask->info.selfChildId, msgId);
|
|
||||||
|
|
||||||
int32_t numOfRetry = 0;
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
|
||||||
if (pEntry == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstream not rsp yet beyond threshold that is 10s
|
|
||||||
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
|
||||||
numOfRetry += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstream inputQ is closed
|
|
||||||
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
|
||||||
numOfRetry += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle other errors
|
|
||||||
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
|
||||||
numOfRetry += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
|
||||||
numOfRetry, msgId);
|
|
||||||
} else {
|
|
||||||
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
||||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
|
||||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
||||||
|
|
||||||
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
|
||||||
if (pEntry != NULL) {
|
|
||||||
setResendInfo(pEntry, now);
|
|
||||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
|
||||||
|
|
||||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
|
||||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
stError("s-task:%s invalid index 0, size:%d", id, s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||||
|
@ -887,7 +901,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||||
|
|
||||||
// todo: secure the timerActive and start timer in after lock pTask->lock
|
// todo: start timer in after lock pTask->lock
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
bool shouldStop = streamTaskShouldStop(pTask);
|
bool shouldStop = streamTaskShouldStop(pTask);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
@ -897,7 +911,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
streamMutexLock(&pTask->msgInfo.lock);
|
streamMutexLock(&pTask->msgInfo.lock);
|
||||||
if (pTask->msgInfo.inMonitor == 0) {
|
if (pTask->msgInfo.inMonitor == 0) {
|
||||||
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
|
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
|
@ -1850,6 +1863,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId,
|
||||||
|
pReq->upstreamNodeId);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
||||||
status = TASK_INPUT_STATUS__REFUSED;
|
status = TASK_INPUT_STATUS__REFUSED;
|
||||||
|
@ -1874,7 +1890,21 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
if (pReq->msgId > pInfo->lastMsgId) {
|
||||||
|
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||||
|
if (status == TASK_INPUT_STATUS__NORMAL) {
|
||||||
|
stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %d", id, pInfo->lastMsgId, pReq->msgId);
|
||||||
|
pInfo->lastMsgId = pReq->msgId;
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stWarn(
|
||||||
|
"s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "
|
||||||
|
"msgId:%" PRId64,
|
||||||
|
id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId);
|
||||||
|
status = TASK_INPUT_STATUS__NORMAL; // still return success
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -881,7 +881,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||||
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
|
stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id);
|
||||||
streamTaskSetIdleInfo(pTask, 500);
|
streamTaskSetIdleInfo(pTask, 500);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1375,13 +1375,23 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
|
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
|
||||||
if (!isLeader) {
|
if (!isLeader) {
|
||||||
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
|
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
|
||||||
|
} else { // wait for nodeep update if become leader from follower
|
||||||
|
if (prevStage == NODE_ROLE_FOLLOWER) {
|
||||||
|
pMeta->startInfo.tasksWillRestart = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
|
if (prevStage == NODE_ROLE_FOLLOWER) {
|
||||||
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
|
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64
|
||||||
|
" restart after nodeEp being updated",
|
||||||
|
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
|
||||||
|
} else {
|
||||||
|
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
|
||||||
|
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
|
||||||
|
}
|
||||||
streamMetaStartHb(pMeta);
|
streamMetaStartHb(pMeta);
|
||||||
} else {
|
} else {
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
||||||
|
|
|
@ -149,10 +149,16 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
pNewPos->beFlushed = true;
|
pNewPos->beFlushed = true;
|
||||||
|
int32_t len = getRowStateRowSize(pFileState);
|
||||||
if (p) {
|
if (p) {
|
||||||
memcpy(pNewPos->pRowBuff, p, *pVLen);
|
if (*pVLen > len){
|
||||||
|
qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len);
|
||||||
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}else{
|
||||||
|
memcpy(pNewPos->pRowBuff, p, *pVLen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t len = getRowStateRowSize(pFileState);
|
|
||||||
memset(pNewPos->pRowBuff, 0, len);
|
memset(pNewPos->pRowBuff, 0, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -296,7 +296,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||||
|
|
||||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -315,7 +314,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
|
||||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) { // record the failure
|
if (streamTaskShouldStop(pTask)) { // record the failure
|
||||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
|
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
|
||||||
pInfo->hTaskId.taskId);
|
pInfo->hTaskId.taskId);
|
||||||
|
|
||||||
|
|
|
@ -465,7 +465,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
streamMetaRLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
@ -473,7 +473,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
if (num == 0) {
|
if (num == 0) {
|
||||||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
|
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -482,7 +482,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
// send hb msg to mnode before closing all tasks.
|
// send hb msg to mnode before closing all tasks.
|
||||||
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,7 +509,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
|
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||||
pEpInfo->nodeId = pTask->info.nodeId;
|
pEpInfo->nodeId = pTask->info.nodeId;
|
||||||
pEpInfo->taskId = pTask->id.taskId;
|
pEpInfo->taskId = pTask->id.taskId;
|
||||||
pEpInfo->stage = -1;
|
pEpInfo->stage = -1;
|
||||||
|
pEpInfo->lastMsgId = -1;
|
||||||
|
|
||||||
return pEpInfo;
|
return pEpInfo;
|
||||||
}
|
}
|
||||||
|
|
|
@ -960,6 +960,7 @@ int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
|
|
||||||
if (vlen != pFileState->rowSize) {
|
if (vlen != pFileState->rowSize) {
|
||||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],vlen:%d, rowSize:%d", key.win.skey, key.win.ekey, key.groupId, vlen, pFileState->rowSize);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue