Merge pull request #15680 from taosdata/fix/dnode
refactor: adjust logs
This commit is contained in:
commit
590266067b
|
@ -961,7 +961,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||||
if (pTmq == NULL) {
|
if (pTmq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||||
|
pTmq->groupId);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -979,7 +980,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||||
|
pTmq->groupId);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1008,14 +1010,16 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
// init semaphore
|
// init semaphore
|
||||||
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
||||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||||
|
pTmq->groupId);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init connection
|
// init connection
|
||||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
||||||
if (pTmq->pTscObj == NULL) {
|
if (pTmq->pTscObj == NULL) {
|
||||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||||
|
pTmq->groupId);
|
||||||
tsem_destroy(&pTmq->rspSem);
|
tsem_destroy(&pTmq->rspSem);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
@ -1024,7 +1028,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
|
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
|
tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
|
||||||
|
|
||||||
return pTmq;
|
return pTmq;
|
||||||
|
|
||||||
|
|
|
@ -1792,8 +1792,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag,
|
len += snprintf(dumpBuf + len, size - len,
|
||||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||||
|
"|rows:%d|version:%" PRIu64 "\n",
|
||||||
|
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||||
pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
||||||
|
|
|
@ -878,14 +878,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
||||||
mDebug("showing consumer %ld no assigned topic, skip", pConsumer->consumerId);
|
mDebug("showing consumer %" PRId64 " no assigned topic, skip", pConsumer->consumerId);
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
|
||||||
mDebug("showing consumer %ld", pConsumer->consumerId);
|
mDebug("showing consumer %" PRId64, pConsumer->consumerId);
|
||||||
|
|
||||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
bool hasTopic = true;
|
bool hasTopic = true;
|
||||||
|
|
|
@ -204,12 +204,12 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfSTables;
|
++pMeta->pVnode->config.vndStats.numOfSTables;
|
||||||
|
|
||||||
metaDebug("vgId:%d, super table is created, name:%s uid:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
metaDebug("vgId:%d, stb:%s is created, suid:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
metaError("vgId:%d, failed to create super table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
|
metaError("vgId:%d, failed to create stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
|
||||||
pReq->suid, tstrerror(terrno));
|
pReq->suid, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -411,7 +411,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
|
|
||||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
|
|
||||||
metaDebug("vgId:%d, table %s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
|
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
|
||||||
pReq->type);
|
pReq->type);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
|
@ -317,7 +317,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||||
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
|
@ -348,8 +348,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, offset reset to %ld", consumerId, pHandle->subKey,
|
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
|
||||||
TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
@ -398,7 +398,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
tqWarn("tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %" PRId64
|
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
", found new consumer epoch %d, discard req epoch %d",
|
||||||
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
|
@ -595,7 +595,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer %ld", req.subKey, pHandle->consumerId);
|
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -714,7 +714,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
if (!pTask->isDataScan) continue;
|
if (!pTask->isDataScan) continue;
|
||||||
|
|
||||||
qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
|
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
|
||||||
|
|
||||||
if (!failed) {
|
if (!failed) {
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
||||||
|
|
|
@ -111,7 +111,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
|
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
||||||
|
pHandle->snapshotVer + 1);
|
||||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||||
qStreamPrepareScan(task, pOffset);
|
qStreamPrepareScan(task, pOffset);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
handle.execHandle.execDb.pFilterOutTbUid =
|
handle.execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %ld vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||||
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %ld vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
||||||
TD_VID(pTq->pVnode));
|
TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
|
|
|
@ -137,7 +137,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
tqDebug("return offset %ld, no more valid", ret->offset.version);
|
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
|
||||||
ASSERT(ret->offset.version >= 0);
|
ASSERT(ret->offset.version >= 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ASSERT(pReader->ver >= 0);
|
ASSERT(pReader->ver >= 0);
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
tqDebug("return offset %ld, processed finish", ret->offset.version);
|
tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -904,8 +904,7 @@ _exit:
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("successful submit in vg %d version %ld", pVnode->config.vgId, version);
|
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -605,7 +605,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
#if 0
|
#if 0
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
||||||
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
||||||
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
|
qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
|
||||||
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -636,8 +636,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
|
||||||
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
|
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows);
|
pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -669,8 +669,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
pTableScanInfo->cond.twindows.skey = oldSkey;
|
pTableScanInfo->cond.twindows.skey = oldSkey;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||||
pTableScanInfo->currentTable, tableSz);
|
ts, pTableScanInfo->currentTable, tableSz);
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -3992,7 +3992,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
|
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
qDebug("creating stream task: add table %ld", pKeyInfo->uid);
|
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2411,9 +2411,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
} else {
|
} else {
|
||||||
int32_t colLen = kv->length;
|
int32_t colLen = kv->length;
|
||||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
// uError("SML:data before:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
// uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||||
// uError("SML:data after:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
// uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(kv->type)) {
|
if (IS_VAR_DATA_TYPE(kv->type)) {
|
||||||
|
|
|
@ -143,7 +143,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
|
|
||||||
// enqueue
|
// enqueue
|
||||||
if (pData != NULL) {
|
if (pData != NULL) {
|
||||||
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %ld", pTask->taskId, pTask->selfChildId,
|
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
|
||||||
pReq->srcTaskId, pReq->reqId);
|
pReq->srcTaskId, pReq->reqId);
|
||||||
|
|
||||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||||
|
|
|
@ -159,8 +159,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %ld", pTask->taskId, pTask->selfChildId,
|
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId,
|
||||||
pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
|
|
|
@ -59,7 +59,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
block.info.childId = pTask->selfChildId;
|
block.info.childId = pTask->selfChildId;
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
qDebug("task %d(child %d) processed retrieve, reqId %ld", pTask->taskId, pTask->selfChildId,
|
qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
|
||||||
pRetrieveBlock->reqId);
|
pRetrieveBlock->reqId);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -93,7 +93,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
do {
|
do {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex,
|
||||||
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
@ -107,7 +108,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
|
||||||
if (beginIndex > endIndex) {
|
if (beginIndex > endIndex) {
|
||||||
do {
|
do {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "snapshot param error, start:%ld, end:%ld", beginIndex, endIndex);
|
snprintf(logBuf, sizeof(logBuf), "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -293,7 +294,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
||||||
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
do {
|
do {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex,
|
||||||
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
@ -392,7 +394,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
do {
|
do {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex,
|
||||||
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
|
@ -82,8 +82,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
} else {
|
} else {
|
||||||
do {
|
do {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%ld, term:%lu", pEntry->index,
|
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64,
|
||||||
pEntry->term);
|
pEntry->index, pEntry->term);
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2238,7 +2238,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "sync node get pre term error, index:%ld, snap-index:%ld, snap-term:%lu", index,
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRIu64, index,
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
syncNodeErrorLog(pSyncNode, logBuf);
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
|
@ -82,12 +82,12 @@ void test2() {
|
||||||
|
|
||||||
code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
||||||
ASSERT(code == 1 && index == pEntry->index);
|
ASSERT(code == 1 && index == pEntry->index);
|
||||||
sTrace("get entry:%p for %ld", pEntry, index);
|
sTrace("get entry:%p for %" PRId64, pEntry, index);
|
||||||
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
||||||
|
|
||||||
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
ASSERT(code == 1 && index == pEntry->index);
|
ASSERT(code == 1 && index == pEntry->index);
|
||||||
sTrace("get entry:%p for %ld", pEntry, index);
|
sTrace("get entry:%p for %" PRId64, pEntry, index);
|
||||||
syncEntryLog2((char*)"==test2 get entry 2==", pEntry);
|
syncEntryLog2((char*)"==test2 get entry 2==", pEntry);
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
|
@ -95,14 +95,14 @@ void test2() {
|
||||||
index = 8;
|
index = 8;
|
||||||
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
sTrace("get entry:%p for %ld", pEntry, index);
|
sTrace("get entry:%p for %" PRId64, pEntry, index);
|
||||||
sTrace("==test2 get entry 8 not found==");
|
sTrace("==test2 get entry 8 not found==");
|
||||||
|
|
||||||
// not found
|
// not found
|
||||||
index = 9;
|
index = 9;
|
||||||
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
sTrace("get entry:%p for %ld", pEntry, index);
|
sTrace("get entry:%p for %" PRId64, pEntry, index);
|
||||||
sTrace("==test2 get entry 9 not found==");
|
sTrace("==test2 get entry 9 not found==");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ void test4() {
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
int64_t rid = taosAddRef(testRefId, pEntry);
|
int64_t rid = taosAddRef(testRefId, pEntry);
|
||||||
sTrace("rid: %ld", rid);
|
sTrace("rid: %" PRId64, rid);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid);
|
SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid);
|
||||||
|
@ -164,7 +164,7 @@ void test5() {
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
int64_t rid = taosAddRef(testRefId, pEntry);
|
int64_t rid = taosAddRef(testRefId, pEntry);
|
||||||
sTrace("rid: %ld", rid);
|
sTrace("rid: %" PRId64, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int64_t rid = 2; rid < 101; rid++) {
|
for (int64_t rid = 2; rid < 101; rid++) {
|
||||||
|
|
|
@ -194,13 +194,13 @@ SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(entryPArray);
|
taosArrayDestroy(entryPArray);
|
||||||
|
|
||||||
sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize);
|
sTrace("get index2: %" PRId64 ", arraySize:%d -------------", index, arraySize);
|
||||||
syncEntryLog2((char*)"getLogEntry2", pEntry);
|
syncEntryLog2((char*)"getLogEntry2", pEntry);
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) {
|
SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) {
|
||||||
sTrace("get index: %ld -------------", index);
|
sTrace("get index: %" PRId64 " -------------", index);
|
||||||
SyncIndex index2 = index;
|
SyncIndex index2 = index;
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
SSkipListIterator* pIter =
|
SSkipListIterator* pIter =
|
||||||
|
|
|
@ -78,7 +78,8 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||||
endVer = TMIN(appliedVer, endVer);
|
endVer = TMIN(appliedVer, endVer);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||||
|
", applied index:%" PRId64 ", end index:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||||
pReader->curStopped = 0;
|
pReader->curStopped = 0;
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= endVer) {
|
||||||
|
@ -190,7 +191,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid: %d) to %" PRId64, pReader->pWal->cfg.vgId,
|
wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
pReader->curVersion, pReader->curInvalid, ver);
|
pReader->curVersion, pReader->curInvalid, ver);
|
||||||
|
|
||||||
pReader->curVersion = ver;
|
pReader->curVersion = ver;
|
||||||
|
@ -200,7 +201,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
|
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
|
||||||
SWal *pWal = pReader->pWal;
|
SWal *pWal = pReader->pWal;
|
||||||
if (!pReader->curInvalid && ver == pReader->curVersion) {
|
if (!pReader->curInvalid && ver == pReader->curVersion) {
|
||||||
wDebug("vgId:%d, wal version %" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,7 +230,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal starts to fetch head %d", pRead->pWal->cfg.vgId, fetchVer);
|
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
|
||||||
|
|
||||||
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||||
|
@ -267,7 +268,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
SWalCont *pReadHead = &pRead->pHead->head;
|
SWalCont *pReadHead = &pRead->pHead->head;
|
||||||
int64_t ver = pReadHead->version;
|
int64_t ver = pReadHead->version;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal starts to fetch body %ld", pRead->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
if (pRead->capacity < pReadHead->bodyLen) {
|
if (pRead->capacity < pReadHead->bodyLen) {
|
||||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
|
@ -312,7 +313,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, version %" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curVersion = ver + 1;
|
pRead->curVersion = ver + 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -406,7 +407,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pRead->pHead->head.version, ver);
|
pRead->pHead->head.version, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
@ -414,7 +415,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walValidBodyCksum(*ppHead) != 0) {
|
if (walValidBodyCksum(*ppHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -425,7 +426,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
wDebug("vgId:%d, wal start to read ver %ld", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
|
Loading…
Reference in New Issue