Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-22710
This commit is contained in:
commit
2a8f7e121a
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG 61cbfd2
|
GIT_TAG 1e15545
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -191,21 +191,45 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_res_t code;
|
tmq_conf_res_t code;
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||||
if (TMQ_CONF_OK != code) return NULL;
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
|
|
@ -1,30 +0,0 @@
|
||||||
FROM ubuntu:18.04
|
|
||||||
|
|
||||||
WORKDIR /root
|
|
||||||
|
|
||||||
ARG pkgFile
|
|
||||||
ARG dirName
|
|
||||||
ARG cpuType
|
|
||||||
RUN echo ${pkgFile} && echo ${dirName}
|
|
||||||
|
|
||||||
RUN apt update
|
|
||||||
RUN apt install -y curl
|
|
||||||
|
|
||||||
COPY ${pkgFile} /root/
|
|
||||||
ENV TINI_VERSION v0.19.0
|
|
||||||
ENV TAOS_DISABLE_ADAPTER 1
|
|
||||||
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
|
||||||
WORKDIR /root/
|
|
||||||
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
|
|
||||||
|
|
||||||
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
|
|
||||||
LC_CTYPE=en_US.UTF-8 \
|
|
||||||
LANG=en_US.UTF-8 \
|
|
||||||
LC_ALL=en_US.UTF-8
|
|
||||||
COPY ./run.sh /usr/bin/
|
|
||||||
COPY ./bin/* /usr/bin/
|
|
||||||
|
|
||||||
ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
|
|
||||||
CMD ["bash", "-c", "/usr/bin/run.sh"]
|
|
||||||
VOLUME [ "/var/lib/taos", "/var/log/taos" ]
|
|
|
@ -1033,6 +1033,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
|
||||||
offset += pInfo->pColData->info.bytes;
|
offset += pInfo->pColData->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(buf);
|
||||||
return phelper;
|
return phelper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2370,7 +2371,11 @@ _end:
|
||||||
taosArrayDestroy(pVals);
|
taosArrayDestroy(pVals);
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
*ppReq = NULL;
|
*ppReq = NULL;
|
||||||
if (pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
|
if (pReq) {
|
||||||
|
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
|
||||||
|
taosMemoryFreeClear(pReq);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
*ppReq = pReq;
|
*ppReq = pReq;
|
||||||
|
|
|
@ -93,7 +93,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
/*
|
||||||
|
pDnode is null, TD-22618
|
||||||
|
at trans.c line 91
|
||||||
|
before this line, dmProcessRpcMsg callback is set
|
||||||
|
after this line, parent is set
|
||||||
|
so when dmProcessRpcMsg is called, pDonde is still null.
|
||||||
|
*/
|
||||||
|
if (pDnode != NULL){
|
||||||
|
if(pDnode->status != DND_STAT_RUNNING) {
|
||||||
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
||||||
dmProcessServerStartupStatus(pDnode, pRpc);
|
dmProcessServerStartupStatus(pDnode, pRpc);
|
||||||
return;
|
return;
|
||||||
|
@ -106,6 +114,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
|
if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
|
||||||
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
|
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
|
||||||
|
|
|
@ -864,6 +864,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
if (!existing) {
|
if (!existing) {
|
||||||
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
|
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
|
||||||
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
|
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(addedTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
|
|
|
@ -629,7 +629,7 @@ void mndDumpSdb() {
|
||||||
}
|
}
|
||||||
taosWriteFile(pFile, pCont, contLen);
|
taosWriteFile(pFile, pCont, contLen);
|
||||||
taosWriteFile(pFile, "\n", 1);
|
taosWriteFile(pFile, "\n", 1);
|
||||||
taosFsyncFile(pFile);
|
UNUSED(taosFsyncFile(pFile));
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
tjsonDelete(json);
|
tjsonDelete(json);
|
||||||
taosMemoryFree(pCont);
|
taosMemoryFree(pCont);
|
||||||
|
|
|
@ -354,7 +354,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
int32_t dataIndex = 0;
|
int32_t dataIndex = 0;
|
||||||
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||||
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||||
if (i < pos->slotId) {
|
if (nullIndex >= numOfNULL || i < pos->slotId) {
|
||||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||||
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||||
|
@ -722,8 +722,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
|
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
|
||||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
// create stb for stream
|
// create stb for stream
|
||||||
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
|
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
|
||||||
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||||
|
|
|
@ -706,7 +706,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ASSERTS(sver > 0, __FILE__, __LINE__, "failed to get table schema version: %d", sver)) {
|
if (ASSERTS(sver > 0, "failed to get table schema version: %d", sver)) {
|
||||||
code = TSDB_CODE_NOT_FOUND;
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
taosArrayPush(tagArray, &tagVal);
|
taosArrayPush(tagArray, &tagVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pCreateTbReq->ctb.tagNum = size;
|
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
|
@ -510,15 +510,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||||
|
|
||||||
// set table name
|
// set table name
|
||||||
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
if (!pDataBlock->info.parTbName[0]) {
|
||||||
if (colDataIsNull_s(pTbColInfo, rowId)) {
|
|
||||||
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||||
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
|
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
|
||||||
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
|
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
|
||||||
} else {
|
} else {
|
||||||
void* pTbData = colDataGetData(pTbColInfo, rowId);
|
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
|
||||||
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
|
|
||||||
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
|
|
||||||
}
|
}
|
||||||
taosArrayPush(reqs.pArray, pCreateTbReq);
|
taosArrayPush(reqs.pArray, pCreateTbReq);
|
||||||
}
|
}
|
||||||
|
|
|
@ -813,7 +813,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||||
if (!state->pBlockData) {
|
if (!state->pBlockData) {
|
||||||
state->pBlockData = &state->blockData;
|
state->pBlockData = &state->blockData;
|
||||||
|
|
||||||
tBlockDataCreate(&state->blockData);
|
code = tBlockDataCreate(&state->blockData);
|
||||||
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case SFSNEXTROW_BLOCKDATA:
|
case SFSNEXTROW_BLOCKDATA:
|
||||||
|
|
|
@ -433,9 +433,11 @@ _end:
|
||||||
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
|
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
|
||||||
taosThreadMutexUnlock(&pr->readerMutex);
|
taosThreadMutexUnlock(&pr->readerMutex);
|
||||||
|
|
||||||
|
if (pRes != NULL) {
|
||||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||||
taosMemoryFree(pRes[j]);
|
taosMemoryFree(pRes[j]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pRes);
|
taosMemoryFree(pRes);
|
||||||
taosArrayDestroyEx(pLastCols, freeItem);
|
taosArrayDestroyEx(pLastCols, freeItem);
|
||||||
|
|
|
@ -219,7 +219,7 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid);
|
ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid);
|
||||||
ASSERT(pIter->rowInfo.uid = pIter->dIter.bData.uid);
|
ASSERT(pIter->rowInfo.uid == pIter->dIter.bData.uid);
|
||||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
|
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
|
||||||
pIter->dIter.iRow++;
|
pIter->dIter.iRow++;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
|
|
@ -148,10 +148,10 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi
|
||||||
|
|
||||||
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t size;
|
int64_t size = 0;
|
||||||
int64_t n;
|
int64_t n;
|
||||||
TdFilePtr pFD;
|
TdFilePtr pFD;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN] = {0};
|
||||||
char hdr[TSDB_FHDR_SIZE] = {0};
|
char hdr[TSDB_FHDR_SIZE] = {0};
|
||||||
|
|
||||||
// truncate
|
// truncate
|
||||||
|
|
|
@ -473,7 +473,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
|
||||||
int8_t forward) {
|
int8_t forward) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t level;
|
int8_t level;
|
||||||
SMemSkipListNode *pNode;
|
SMemSkipListNode *pNode = NULL;
|
||||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||||
int64_t nSize;
|
int64_t nSize;
|
||||||
|
|
||||||
|
@ -591,7 +591,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
|
pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
|
||||||
if (pBlockData->aColData == NULL) {
|
if (pBlockData->aColData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
|
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
|
||||||
code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
|
code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
|
@ -874,7 +874,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
pBlockNum->numOfBlocks += 1;
|
pBlockNum->numOfBlocks += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
|
||||||
numOfQTable += 1;
|
numOfQTable += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4532,7 +4532,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
if (pResBlock->pBlockAgg == NULL) {
|
if (pResBlock->pBlockAgg == NULL) {
|
||||||
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
||||||
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
|
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
// do fill all null column value SMA info
|
// do fill all null column value SMA info
|
||||||
|
|
|
@ -230,12 +230,19 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:
|
||||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||||
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
||||||
ASSERT(0);
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
|
goto _end;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||||
if (colDataIsNull_s(pColInfoData, j)) {
|
if (colDataIsNull_s(pColInfoData, j)) {
|
||||||
|
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
||||||
|
qError("NULL value for primary key");
|
||||||
|
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
} else {
|
} else {
|
||||||
|
@ -256,7 +263,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||||
ASSERT(0);
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -296,7 +304,7 @@ _end:
|
||||||
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
|
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_FAILED;
|
return terrno;
|
||||||
}
|
}
|
||||||
*ppReq = pReq;
|
*ppReq = pReq;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1148,7 +1148,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
REPLACE_NODE(pNew);
|
REPLACE_NODE(pNew);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(keyBuf);
|
|
||||||
nodesDestroyList(groupNew);
|
nodesDestroyList(groupNew);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1166,7 +1165,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||||
if (tTagIsJson(data)) {
|
if (tTagIsJson(data)) {
|
||||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
taosMemoryFree(keyBuf);
|
|
||||||
nodesDestroyList(groupNew);
|
nodesDestroyList(groupNew);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
|
|
|
@ -2035,7 +2035,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
||||||
tDecoderClear(&mr.coder);
|
tDecoderClear(&mr.coder);
|
||||||
|
|
||||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||||
metaGetTableEntryByUidCache(&mr, suid);
|
code = metaGetTableEntryByUidCache(&mr, suid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||||
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
|
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -992,32 +992,42 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
|
||||||
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
||||||
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
pTmpBlock->info.id.groupId = groupId;
|
pTmpBlock->info.id.groupId = groupId;
|
||||||
|
char* tbName = pSrcBlock->info.parTbName;
|
||||||
if (pTableSup->numOfExprs > 0) {
|
if (pTableSup->numOfExprs > 0) {
|
||||||
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
||||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||||
void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
|
|
||||||
char* tbName = pSrcBlock->info.parTbName;
|
|
||||||
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
int32_t len = 0;
|
||||||
|
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
|
||||||
|
len = TMIN(sizeof(TSDB_DATA_NULL_STR), TSDB_TABLE_NAME_LEN - 1);
|
||||||
|
memcpy(tbName, TSDB_DATA_NULL_STR, len);
|
||||||
|
} else {
|
||||||
|
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
|
||||||
|
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||||
memcpy(tbName, varDataVal(pData), len);
|
memcpy(tbName, varDataVal(pData), len);
|
||||||
|
}
|
||||||
streamStatePutParName(pState, groupId, tbName);
|
streamStatePutParName(pState, groupId, tbName);
|
||||||
memcpy(pTmpBlock->info.parTbName, tbName, len);
|
memcpy(pTmpBlock->info.parTbName, tbName, len);
|
||||||
pDestBlock->info.rows--;
|
pDestBlock->info.rows--;
|
||||||
} else {
|
} else {
|
||||||
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||||
colDataAppendNULL(pTbNameCol, pDestBlock->info.rows);
|
colDataAppendNULL(pTbNameCol, pDestBlock->info.rows);
|
||||||
pSrcBlock->info.parTbName[0] = 0;
|
tbName[0] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTagSup->numOfExprs > 0) {
|
if (pTagSup->numOfExprs > 0) {
|
||||||
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
|
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
|
||||||
pDestBlock->info.rows--;
|
pDestBlock->info.rows--;
|
||||||
|
} else {
|
||||||
|
memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||||
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
||||||
pDestBlock->info.rows++;
|
pDestBlock->info.rows++;
|
||||||
blockDataDestroy(pTmpBlock);
|
blockDataDestroy(pTmpBlock);
|
||||||
|
} else {
|
||||||
|
memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
|
||||||
}
|
}
|
||||||
streamStateReleaseBuf(pState, NULL, pValue);
|
streamStateReleaseBuf(pState, NULL, pValue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
// for stream interval
|
// for stream interval
|
||||||
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
// printDataBlock1(pBlock, "project1");
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3050,8 +3050,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||||
strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
||||||
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
int32_t code =
|
int32_t code =
|
||||||
|
@ -56,7 +57,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
pOperator->exprSupp.pCtx =
|
||||||
|
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -68,8 +70,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
|
||||||
|
|
||||||
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
||||||
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
|
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
|
||||||
|
|
|
@ -1608,9 +1608,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
|
||||||
if (pInfo->tbnameSlotId != -1) {
|
if (pInfo->tbnameSlotId != -1) {
|
||||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
||||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||||
memcpy(varDataVal(varTbName), name, strlen(name));
|
STR_TO_VARSTR(varTbName, name);
|
||||||
varDataSetLen(varTbName, strlen(name));
|
|
||||||
|
|
||||||
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
|
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -212,6 +212,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||||
int32_t pageId = -1;
|
int32_t pageId = -1;
|
||||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
|
taosArrayDestroy(pPageIdList);
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
|
||||||
|
|
||||||
// access
|
// access
|
||||||
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
|
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
|
||||||
return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
|
return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
|
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
|
||||||
|
|
|
@ -49,39 +49,6 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
|
||||||
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SRaftEntryHashCache {
|
|
||||||
SHashObj* pEntryHash;
|
|
||||||
int32_t maxCount;
|
|
||||||
int32_t currentCount;
|
|
||||||
TdThreadMutex mutex;
|
|
||||||
SSyncNode* pSyncNode;
|
|
||||||
} SRaftEntryHashCache;
|
|
||||||
|
|
||||||
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
|
||||||
void raftCacheDestroy(SRaftEntryHashCache* pCache);
|
|
||||||
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry);
|
|
||||||
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index);
|
|
||||||
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache);
|
|
||||||
|
|
||||||
typedef struct SRaftEntryCache {
|
|
||||||
SSkipList* pSkipList;
|
|
||||||
int32_t maxCount;
|
|
||||||
int32_t currentCount;
|
|
||||||
int32_t refMgr;
|
|
||||||
TdThreadMutex mutex;
|
|
||||||
SSyncNode* pSyncNode;
|
|
||||||
} SRaftEntryCache;
|
|
||||||
|
|
||||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
|
||||||
void raftEntryCacheDestroy(SRaftEntryCache* pCache);
|
|
||||||
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
|
|
||||||
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -104,6 +104,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||||
SRpcMsg rpcRsp = {0};
|
SRpcMsg rpcRsp = {0};
|
||||||
bool accepted = false;
|
bool accepted = false;
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
||||||
|
@ -137,14 +139,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
syncNodeStepDown(ths, pMsg->term);
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
syncNodeResetElectTimer(ths);
|
syncNodeResetElectTimer(ths);
|
||||||
|
|
||||||
if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
|
if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
|
||||||
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
|
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
|
||||||
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
goto _IGNORE;
|
goto _IGNORE;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||||
|
|
||||||
if (pEntry == NULL) {
|
if (pEntry == NULL) {
|
||||||
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||||
goto _IGNORE;
|
goto _IGNORE;
|
||||||
|
@ -191,5 +192,6 @@ _out:
|
||||||
|
|
||||||
_IGNORE:
|
_IGNORE:
|
||||||
rpcFreeCont(rpcRsp.pCont);
|
rpcFreeCont(rpcRsp.pCont);
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
|
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
|
|
|
@ -1126,7 +1126,10 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodePreClose(SSyncNode* pSyncNode) {
|
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
|
ASSERT(pSyncNode != NULL);
|
||||||
|
ASSERT(pSyncNode->pFsm != NULL);
|
||||||
|
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
|
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
|
||||||
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
|
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
|
||||||
|
@ -1135,20 +1138,6 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
taosMsleep(20);
|
taosMsleep(20);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pSyncNode->pNewNodeReceiver != NULL) {
|
|
||||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
|
||||||
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
|
||||||
}
|
|
||||||
|
|
||||||
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
|
|
||||||
pSyncNode->pNewNodeReceiver);
|
|
||||||
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
|
|
||||||
pSyncNode->pNewNodeReceiver = NULL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// stop elect timer
|
// stop elect timer
|
||||||
syncNodeStopElectTimer(pSyncNode);
|
syncNodeStopElectTimer(pSyncNode);
|
||||||
|
@ -1461,7 +1450,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
||||||
}
|
}
|
||||||
|
|
||||||
// log begin config change
|
// log begin config change
|
||||||
sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
|
sNInfo(pSyncNode, "begin do config change, from %d to %d, replicas:%d", pSyncNode->vgId, oldConfig.replicaNum,
|
||||||
pNewConfig->replicaNum);
|
pNewConfig->replicaNum);
|
||||||
|
|
||||||
if (IamInNew) {
|
if (IamInNew) {
|
||||||
|
@ -1742,8 +1731,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// close receiver
|
// close receiver
|
||||||
if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
|
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||||
snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
|
||||||
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -901,7 +901,7 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
|
||||||
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
|
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
|
||||||
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
|
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
|
||||||
int64_t timeDiffMs = lastSentMs - firstSentMs;
|
int64_t timeDiffMs = lastSentMs - firstSentMs;
|
||||||
if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
|
if (timeDiffMs > 0 && timeDiffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
|
||||||
pMgr->retryBackoff -= 1;
|
pMgr->retryBackoff -= 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -928,10 +928,6 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
|
||||||
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
|
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
|
||||||
|
|
||||||
return pMgr;
|
return pMgr;
|
||||||
|
|
||||||
_err:
|
|
||||||
taosMemoryFree(pMgr);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
|
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
|
||||||
|
|
|
@ -224,7 +224,7 @@ _OVER:
|
||||||
|
|
||||||
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
|
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
|
||||||
SRaftCfg *pCfg = &pNode->raftCfg;
|
SRaftCfg *pCfg = &pNode->raftCfg;
|
||||||
if (pCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT) {
|
if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,344 +102,3 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
|
||||||
SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
|
|
||||||
if (pCache == NULL) {
|
|
||||||
sError("vgId:%d, raft cache create error", pSyncNode->vgId);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCache->pEntryHash =
|
|
||||||
taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
|
||||||
if (pCache->pEntryHash == NULL) {
|
|
||||||
sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexInit(&(pCache->mutex), NULL);
|
|
||||||
pCache->maxCount = maxCount;
|
|
||||||
pCache->currentCount = 0;
|
|
||||||
pCache->pSyncNode = pSyncNode;
|
|
||||||
|
|
||||||
return pCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
|
|
||||||
if (pCache != NULL) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
taosHashCleanup(pCache->pEntryHash);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
taosThreadMutexDestroy(&(pCache->mutex));
|
|
||||||
taosMemoryFree(pCache);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// success, return 1
|
|
||||||
// max count, return 0
|
|
||||||
// error, return -1
|
|
||||||
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
|
|
||||||
if (pCache->currentCount >= pCache->maxCount) {
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
|
|
||||||
++(pCache->currentCount);
|
|
||||||
|
|
||||||
sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
|
||||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
|
||||||
pEntry->index, pEntry->bytes);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// success, return 0
|
|
||||||
// error, return -1
|
|
||||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
|
||||||
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
|
||||||
if (ppEntry == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
*ppEntry = NULL;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
|
||||||
if (pTmp != NULL) {
|
|
||||||
SSyncRaftEntry* pEntry = pTmp;
|
|
||||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
|
||||||
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
|
||||||
|
|
||||||
sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
|
||||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
|
||||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// success, return 0
|
|
||||||
// error, return -1
|
|
||||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
|
||||||
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
|
||||||
if (ppEntry == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
*ppEntry = NULL;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
|
||||||
if (pTmp != NULL) {
|
|
||||||
SSyncRaftEntry* pEntry = pTmp;
|
|
||||||
*ppEntry = pEntry;
|
|
||||||
|
|
||||||
sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
|
||||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
|
||||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
|
||||||
--(pCache->currentCount);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
|
||||||
if (ppEntry == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
*ppEntry = NULL;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
|
||||||
if (pTmp != NULL) {
|
|
||||||
SSyncRaftEntry* pEntry = pTmp;
|
|
||||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
|
||||||
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
|
||||||
|
|
||||||
sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
|
||||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
|
||||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
|
||||||
|
|
||||||
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
|
||||||
--(pCache->currentCount);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
taosHashClear(pCache->pEntryHash);
|
|
||||||
pCache->currentCount = 0;
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static char* keyFn(const void* pData) {
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
|
||||||
return (char*)(&(pEntry->index));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
|
||||||
|
|
||||||
static void freeRaftEntry(void* param) {
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
|
||||||
syncEntryDestroy(pEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
|
||||||
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
|
||||||
if (pCache == NULL) {
|
|
||||||
sError("vgId:%d, raft cache create error", pSyncNode->vgId);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCache->pSkipList =
|
|
||||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
|
||||||
if (pCache->pSkipList == NULL) {
|
|
||||||
sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexInit(&(pCache->mutex), NULL);
|
|
||||||
pCache->refMgr = taosOpenRef(10, freeRaftEntry);
|
|
||||||
pCache->maxCount = maxCount;
|
|
||||||
pCache->currentCount = 0;
|
|
||||||
pCache->pSyncNode = pSyncNode;
|
|
||||||
|
|
||||||
return pCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
|
|
||||||
if (pCache != NULL) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
tSkipListDestroy(pCache->pSkipList);
|
|
||||||
if (pCache->refMgr != -1) {
|
|
||||||
taosCloseRef(pCache->refMgr);
|
|
||||||
pCache->refMgr = -1;
|
|
||||||
}
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
taosThreadMutexDestroy(&(pCache->mutex));
|
|
||||||
taosMemoryFree(pCache);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// success, return 1
|
|
||||||
// max count, return 0
|
|
||||||
// error, return -1
|
|
||||||
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
|
|
||||||
if (pCache->currentCount >= pCache->maxCount) {
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
|
|
||||||
ASSERT(pSkipListNode != NULL);
|
|
||||||
++(pCache->currentCount);
|
|
||||||
|
|
||||||
pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
|
|
||||||
ASSERT(pEntry->rid >= 0);
|
|
||||||
|
|
||||||
sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
|
||||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
|
||||||
pEntry->index, pEntry->bytes);
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// find one, return 1
|
|
||||||
// not found, return 0
|
|
||||||
// error, return -1
|
|
||||||
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
|
||||||
ASSERT(ppEntry != NULL);
|
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
|
||||||
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
|
||||||
if (code == 1) {
|
|
||||||
int32_t bytes = (int32_t)pEntry->bytes;
|
|
||||||
*ppEntry = taosMemoryMalloc((int64_t)bytes);
|
|
||||||
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
|
||||||
(*ppEntry)->rid = -1;
|
|
||||||
} else {
|
|
||||||
*ppEntry = NULL;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// find one, return 1
|
|
||||||
// not found, return 0
|
|
||||||
// error, return -1
|
|
||||||
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
|
|
||||||
SyncIndex index2 = index;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
|
|
||||||
int32_t arraySize = taosArrayGetSize(entryPArray);
|
|
||||||
if (arraySize == 1) {
|
|
||||||
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
|
||||||
ASSERT(*ppNode != NULL);
|
|
||||||
*ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
|
||||||
taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
|
|
||||||
code = 1;
|
|
||||||
|
|
||||||
} else if (arraySize == 0) {
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
taosArrayDestroy(entryPArray);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// count = -1, clear all
|
|
||||||
// count >= 0, clear count
|
|
||||||
// return -1, error
|
|
||||||
// return delete count
|
|
||||||
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
|
||||||
taosThreadMutexLock(&pCache->mutex);
|
|
||||||
int32_t returnCnt = 0;
|
|
||||||
|
|
||||||
if (count == -1) {
|
|
||||||
// clear all
|
|
||||||
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
|
||||||
ASSERT(pNode != NULL);
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
|
||||||
syncEntryDestroy(pEntry);
|
|
||||||
++returnCnt;
|
|
||||||
}
|
|
||||||
tSkipListDestroyIter(pIter);
|
|
||||||
|
|
||||||
tSkipListDestroy(pCache->pSkipList);
|
|
||||||
pCache->pSkipList =
|
|
||||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
|
||||||
ASSERT(pCache->pSkipList != NULL);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// clear count
|
|
||||||
int i = 0;
|
|
||||||
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
|
||||||
SArray* delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));
|
|
||||||
|
|
||||||
// free entry
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
|
||||||
ASSERT(pNode != NULL);
|
|
||||||
if (i++ >= count) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// sDebug("push pNode:%p", pNode);
|
|
||||||
taosArrayPush(delNodeArray, &pNode);
|
|
||||||
++returnCnt;
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
|
||||||
|
|
||||||
// syncEntryDestroy(pEntry);
|
|
||||||
taosRemoveRef(pCache->refMgr, pEntry->rid);
|
|
||||||
}
|
|
||||||
tSkipListDestroyIter(pIter);
|
|
||||||
|
|
||||||
// delete skiplist node
|
|
||||||
int32_t arraySize = taosArrayGetSize(delNodeArray);
|
|
||||||
for (int32_t i = 0; i < arraySize; ++i) {
|
|
||||||
SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
|
|
||||||
// sDebug("get pNode:%p", *ppNode);
|
|
||||||
tSkipListRemoveNode(pCache->pSkipList, *ppNode);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(delNodeArray);
|
|
||||||
}
|
|
||||||
|
|
||||||
pCache->currentCount -= returnCnt;
|
|
||||||
taosThreadMutexUnlock(&pCache->mutex);
|
|
||||||
return returnCnt;
|
|
||||||
}
|
|
||||||
|
|
|
@ -168,17 +168,19 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
if (pSender->blockLen > 0) {
|
if (pSender->blockLen > 0) {
|
||||||
// has read data
|
// has read data
|
||||||
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
|
sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
|
||||||
|
pSender->blockLen, pSender->seq);
|
||||||
} else {
|
} else {
|
||||||
// read finish, update seq to end
|
// read finish, update seq to end
|
||||||
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
|
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
|
||||||
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
|
sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
|
||||||
|
pSender->blockLen, pSender->seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// build msg
|
// build msg
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
|
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||||
sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,11 +342,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
||||||
taosMemoryFree(pReceiver);
|
taosMemoryFree(pReceiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
|
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
return (pReceiver != NULL ? pReceiver->start : false);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
|
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -851,8 +855,8 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
||||||
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
|
|
||||||
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
|
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
|
||||||
sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
|
sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64,
|
||||||
snapshot.lastApplyIndex);
|
pMsg->snapBeginIndex, snapshot.lastApplyIndex);
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -966,7 +970,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pSender->pReader == NULL || pSender->finish) {
|
if (pSender->pReader == NULL || pSender->finish) {
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
|
||||||
sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
|
sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
|
||||||
|
pSender->pReader, pSender->finish);
|
||||||
terrno = pMsg->code;
|
terrno = pMsg->code;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
|
@ -913,7 +913,7 @@ int walLoadMeta(SWal* pWal) {
|
||||||
int64_t fileSize = 0;
|
int64_t fileSize = 0;
|
||||||
taosStatFile(fnameStr, &fileSize, NULL);
|
taosStatFile(fnameStr, &fileSize, NULL);
|
||||||
if (fileSize == 0) {
|
if (fileSize == 0) {
|
||||||
taosRemoveFile(fnameStr);
|
(void)taosRemoveFile(fnameStr);
|
||||||
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
walRemoveMeta(pWal);
|
(void)walRemoveMeta(pWal);
|
||||||
|
|
||||||
pWal->writeCur = -1;
|
pWal->writeCur = -1;
|
||||||
pWal->totSize = 0;
|
pWal->totSize = 0;
|
||||||
|
|
|
@ -391,7 +391,7 @@ _error:
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* doExtractPage(SDiskbasedBuf* pBuf) {
|
static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) {
|
||||||
char* availablePage = NULL;
|
char* availablePage = NULL;
|
||||||
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
||||||
availablePage = evictBufPage(pBuf);
|
availablePage = evictBufPage(pBuf);
|
||||||
|
@ -405,6 +405,7 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
|
||||||
if (availablePage == NULL) {
|
if (availablePage == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
*newPage = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return availablePage;
|
return availablePage;
|
||||||
|
@ -413,7 +414,8 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
|
||||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||||
pBuf->statis.getPages += 1;
|
pBuf->statis.getPages += 1;
|
||||||
|
|
||||||
char* availablePage = doExtractPage(pBuf);
|
bool newPage = false;
|
||||||
|
char* availablePage = doExtractPage(pBuf, &newPage);
|
||||||
if (availablePage == NULL) {
|
if (availablePage == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -432,6 +434,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||||
// register page id info
|
// register page id info
|
||||||
pi = registerNewPageInfo(pBuf, *pageId);
|
pi = registerNewPageInfo(pBuf, *pageId);
|
||||||
if (pi == NULL) {
|
if (pi == NULL) {
|
||||||
|
if (newPage) {
|
||||||
|
taosMemoryFree(availablePage);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -492,7 +497,8 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
|
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
|
||||||
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||||
|
|
||||||
(*pi)->pData = doExtractPage(pBuf);
|
bool newPage = false;
|
||||||
|
(*pi)->pData = doExtractPage(pBuf, &newPage);
|
||||||
|
|
||||||
// failed to evict buffer page, return with error code.
|
// failed to evict buffer page, return with error code.
|
||||||
if ((*pi)->pData == NULL) {
|
if ((*pi)->pData == NULL) {
|
||||||
|
@ -509,6 +515,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
if (HAS_DATA_IN_DISK(*pi)) {
|
if (HAS_DATA_IN_DISK(*pi)) {
|
||||||
int32_t code = loadPageFromDisk(pBuf, *pi);
|
int32_t code = loadPageFromDisk(pBuf, *pi);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
if (newPage) {
|
||||||
|
taosMemoryFree((*pi)->pData);
|
||||||
|
}
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
|
||||||
if ((*num) >= size) {
|
if ((*num) >= size) {
|
||||||
size = (size << 1);
|
size = (size << 1);
|
||||||
split = taosMemoryRealloc(split, POINTER_BYTES * size);
|
split = taosMemoryRealloc(split, POINTER_BYTES * size);
|
||||||
ASSERTS(NULL != split, "realloc memory failed. size=%d", POINTER_BYTES * size);
|
ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t) POINTER_BYTES * size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -301,7 +301,7 @@ print $data00, $data01, $data02, $data03
|
||||||
print $data10, $data11, $data12, $data13
|
print $data10, $data11, $data12, $data13
|
||||||
print $data20, $data21, $data22, $data23
|
print $data20, $data21, $data22, $data23
|
||||||
|
|
||||||
loop2:
|
loop3:
|
||||||
|
|
||||||
sleep 300
|
sleep 300
|
||||||
|
|
||||||
|
@ -317,47 +317,182 @@ if $rows != 2 then
|
||||||
print $data00, $data01, $data02, $data03
|
print $data00, $data01, $data02, $data03
|
||||||
print $data10, $data11, $data12, $data13
|
print $data10, $data11, $data12, $data13
|
||||||
print $data20, $data21, $data22, $data23
|
print $data20, $data21, $data22, $data23
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data01 != 10 then
|
if $data01 != 10 then
|
||||||
print =====data01=$data01
|
print =====data01=$data01
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data02 != 20 then
|
if $data02 != 20 then
|
||||||
print =====data02=$data02
|
print =====data02=$data02
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data03 != 1 then
|
if $data03 != 1 then
|
||||||
print =====data03=$data03
|
print =====data03=$data03
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data04 != NULL then
|
if $data04 != NULL then
|
||||||
print =====data04=$data04
|
print =====data04=$data04
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data11 != 40 then
|
if $data11 != 40 then
|
||||||
print =====data11=$data11
|
print =====data11=$data11
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data12 != 50 then
|
if $data12 != 50 then
|
||||||
print =====data12=$data12
|
print =====data12=$data12
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data13 != 1 then
|
if $data13 != 1 then
|
||||||
print =====data13=$data13
|
print =====data13=$data13
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data14 != NULL then
|
if $data14 != NULL then
|
||||||
print =====data14=$data14
|
print =====data14=$data14
|
||||||
goto loop2
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ===== step7
|
||||||
|
|
||||||
|
sql create database result5 vgroups 1;
|
||||||
|
|
||||||
|
sql create database test5 vgroups 4;
|
||||||
|
sql use test5;
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,2,3);
|
||||||
|
sql create table t2 using st tags(4,5,6);
|
||||||
|
|
||||||
|
sql create stable result5.streamt5(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int);
|
||||||
|
|
||||||
|
sql create stream streams5 trigger at_once into result5.streamt5(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg3 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,NULL,NULL,NULL);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
print select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
|
||||||
|
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
|
||||||
|
print $data00, $data01, $data02, $data03
|
||||||
|
print $data10, $data11, $data12, $data13
|
||||||
|
print $data20, $data21, $data22, $data23
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print sql select * from result5.streamt5 order by tg1;
|
||||||
|
sql select * from result5.streamt5 order by tg1;
|
||||||
|
print $data00, $data01, $data02, $data03 $data04 $data05 $data06 $data07
|
||||||
|
print $data10, $data11, $data12, $data13
|
||||||
|
print $data20, $data21, $data22, $data23
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != NULL then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 1 then
|
||||||
|
print =====data03=$data03
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data04 != NULL then
|
||||||
|
print =====data04=$data04
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data05 != 2 then
|
||||||
|
print =====data05=$data05
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data06 != 2 then
|
||||||
|
print =====data06=$data06
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data07 != NULL then
|
||||||
|
print =====data07=$data07
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql drop stream if exists streams4;
|
||||||
|
sql drop stream if exists streams5;
|
||||||
|
sql drop database if exists test4;
|
||||||
|
sql drop database if exists test5;
|
||||||
|
sql drop database if exists result4;
|
||||||
|
sql drop database if exists result5;
|
||||||
|
|
||||||
|
print ===== step8
|
||||||
|
|
||||||
|
sql drop stream if exists streams8;
|
||||||
|
sql drop database if exists test8;
|
||||||
|
sql create database test8 vgroups 1;
|
||||||
|
sql use test8;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams8 trigger at_once into streamt8 as select _wstart as ts, count(*) c1, count(d) c2, count(c) c3 from t1 partition by tbname interval(10s) ;
|
||||||
|
|
||||||
|
sql drop stream streams8;
|
||||||
|
sql create stream streams71 trigger at_once into streamt8(ts, c2) tags(group_id)as select _wstart, count(*) from t1 partition by tbname as group_id interval(10s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,1,2,3,1.0);
|
||||||
|
|
||||||
|
loop8:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt8;
|
||||||
|
print $data00, $data01, $data02, $data03
|
||||||
|
print $data10, $data11, $data12, $data13
|
||||||
|
print $data20, $data21, $data22, $data23
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != NULL then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != NULL then
|
||||||
|
print =====data03=$data03
|
||||||
|
goto loop8
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ======over
|
print ======over
|
||||||
|
|
|
@ -39,7 +39,10 @@ sql select table_name from information_schema.ins_tables where db_name="result"
|
||||||
|
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
print $data00 $data10
|
print $data00
|
||||||
|
print $data10
|
||||||
|
print $data20
|
||||||
|
print $data30
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -1381,7 +1381,7 @@ void startOmbConsume() {
|
||||||
printf("SQL: %s\n", sql);
|
printf("SQL: %s\n", sql);
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
|
|
||||||
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
|
int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers);
|
||||||
|
|
||||||
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
||||||
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
||||||
|
|
|
@ -31,7 +31,7 @@ void simLogSql(char *sql, bool useSharp) {
|
||||||
taosFprintfFile(pFile, "%s;\n", sql);
|
taosFprintfFile(pFile, "%s;\n", sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFsyncFile(pFile);
|
UNUSED(taosFsyncFile(pFile));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue