diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index eb5db9d639..c11651970c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -61,45 +61,23 @@ extern "C" { } \ } -#define WAL_HEAD_VER 0 +#define WAL_PROTO_VER 0 #define WAL_NOSUFFIX_LEN 20 #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) #define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" #define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDULL -#pragma pack(push, 1) typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2, } EWalType; -// used by sync module -typedef struct { - int8_t isWeek; - uint64_t seqNum; - uint64_t term; -} SSyncLogMeta; - -typedef struct SWalReadHead { - int8_t headVer; - int8_t reserved; - int16_t msgType; - int32_t bodyLen; - int64_t ingestTs; // not implemented - int64_t version; - - // sync meta - SSyncLogMeta syncMeta; - - char body[]; -} SWalReadHead; - typedef struct { int32_t vgId; int32_t fsyncPeriod; // millisecond @@ -110,13 +88,6 @@ typedef struct { EWalType level; // wal level } SWalCfg; -typedef struct { - uint64_t magic; - uint32_t cksumHead; - uint32_t cksumBody; - SWalReadHead head; -} SWalHead; - typedef struct SWalVer { int64_t firstVer; int64_t verInSnapshotting; @@ -125,6 +96,35 @@ typedef struct SWalVer { int64_t lastVer; } SWalVer; +#pragma pack(push, 1) +// used by sync module +typedef struct { + int8_t isWeek; + uint64_t seqNum; + uint64_t term; +} SSyncLogMeta; + +typedef struct { + int8_t protoVer; + int64_t version; + int16_t msgType; + int32_t bodyLen; + int64_t ingestTs; // not implemented + + // sync meta + SSyncLogMeta syncMeta; + + char body[]; +} SWalCont; + +typedef struct { + uint64_t magic; + uint32_t cksumHead; + uint32_t cksumBody; + SWalCont head; +} SWalCkHead; +#pragma pack(pop) + typedef struct SWal { // cfg SWalCfg cfg; @@ -134,7 +134,7 @@ typedef struct SWal { TdFilePtr pWriteLogTFile; TdFilePtr pWriteIdxTFile; int32_t writeCur; - SArray *fileInfoSet; + SArray *fileInfoSet; // SArray // status int64_t totSize; int64_t lastRollSeq; @@ -146,7 +146,7 @@ typedef struct SWal { // path char path[WAL_PATH_LEN]; // reusable write head - SWalHead writeHead; + SWalCkHead writeHead; } SWal; // WAL HANDLE typedef struct SWalReadHandle { @@ -158,11 +158,8 @@ typedef struct SWalReadHandle { int64_t capacity; int64_t status; // if cursor valid TdThreadMutex mutex; - SWalHead *pHead; + SWalCkHead *pHead; } SWalReadHandle; -#pragma pack(pop) - -// typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization int32_t walInit(); @@ -174,9 +171,9 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -int64_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, +int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, int32_t bodyLen); -int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); +int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); // apis for lifecycle management @@ -196,9 +193,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); // only for tq usage void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); -int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead); -int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead); -int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead); +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead); +int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead); +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead); typedef struct { int64_t refId; diff --git a/include/os/osFile.h b/include/os/osFile.h index 8751e175a5..2f6a6ba480 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -25,17 +25,17 @@ extern "C" { // If the error is in a third-party library, place this header file under the third-party library header file. // When you want to use this feature, you should find or add the same function in the following sectio #ifndef ALLOW_FORBID_FUNC - #define open OPEN_FUNC_TAOS_FORBID - #define fopen FOPEN_FUNC_TAOS_FORBID - #define access ACCESS_FUNC_TAOS_FORBID - #define stat STAT_FUNC_TAOS_FORBID - #define lstat LSTAT_FUNC_TAOS_FORBID - #define fstat FSTAT_FUNC_TAOS_FORBID - #define close CLOSE_FUNC_TAOS_FORBID - #define fclose FCLOSE_FUNC_TAOS_FORBID - #define fsync FSYNC_FUNC_TAOS_FORBID - #define getline GETLINE_FUNC_TAOS_FORBID - // #define fflush FFLUSH_FUNC_TAOS_FORBID +#define open OPEN_FUNC_TAOS_FORBID +#define fopen FOPEN_FUNC_TAOS_FORBID +#define access ACCESS_FUNC_TAOS_FORBID +#define stat STAT_FUNC_TAOS_FORBID +#define lstat LSTAT_FUNC_TAOS_FORBID +#define fstat FSTAT_FUNC_TAOS_FORBID +#define close CLOSE_FUNC_TAOS_FORBID +#define fclose FCLOSE_FUNC_TAOS_FORBID +#define fsync FSYNC_FUNC_TAOS_FORBID +#define getline GETLINE_FUNC_TAOS_FORBID +// #define fflush FFLUSH_FUNC_TAOS_FORBID #endif #ifndef PATH_MAX @@ -43,54 +43,54 @@ extern "C" { #endif typedef struct TdFile *TdFilePtr; - -#define TD_FILE_CREATE 0x0001 -#define TD_FILE_WRITE 0x0002 -#define TD_FILE_READ 0x0004 -#define TD_FILE_TRUNC 0x0008 -#define TD_FILE_APPEND 0x0010 -#define TD_FILE_TEXT 0x0020 -#define TD_FILE_AUTO_DEL 0x0040 -#define TD_FILE_EXCL 0x0080 -#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile -TdFilePtr taosOpenFile(const char *path,int32_t tdFileOptions); + +#define TD_FILE_CREATE 0x0001 +#define TD_FILE_WRITE 0x0002 +#define TD_FILE_READ 0x0004 +#define TD_FILE_TRUNC 0x0008 +#define TD_FILE_APPEND 0x0010 +#define TD_FILE_TEXT 0x0020 +#define TD_FILE_AUTO_DEL 0x0040 +#define TD_FILE_EXCL 0x0080 +#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile +TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 #define TD_FILE_ACCESS_WRITE_OK 0x4 -bool taosCheckAccessFile(const char *pathname, int mode); - +bool taosCheckAccessFile(const char *pathname, int mode); + int32_t taosLockFile(TdFilePtr pFile); int32_t taosUnLockFile(TdFilePtr pFile); - + int32_t taosUmaskFile(int32_t maskVal); - + int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime); int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno); int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime); bool taosCheckExistFile(const char *pathname); - + int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence); int32_t taosFtruncateFile(TdFilePtr pFile, int64_t length); int32_t taosFsyncFile(TdFilePtr pFile); - + int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count); int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset); int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); void taosFprintfFile(TdFilePtr pFile, const char *format, ...); -int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf); +int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf); int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf); int32_t taosEOFFile(TdFilePtr pFile); - -int64_t taosCloseFile(TdFilePtr *ppFile); - + +int32_t taosCloseFile(TdFilePtr *ppFile); + int32_t taosRenameFile(const char *oldName, const char *newName); int64_t taosCopyFile(const char *from, const char *to); int32_t taosRemoveFile(const char *path); - -void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); + +void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); diff --git a/include/util/tarray.h b/include/util/tarray.h index 482f13de39..e8037699ae 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -139,7 +139,7 @@ void* taosArrayGetLast(const SArray* pArray); * @param pArray * @return */ -size_t taosArrayGetSize(const SArray* pArray); +int32_t taosArrayGetSize(const SArray* pArray); /** * set the size of array diff --git a/include/util/tutil.h b/include/util/tutil.h index b29e1f7cfa..6a1a40f14c 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -44,6 +44,8 @@ uint32_t ip2uint(const char *const ip_addr); void taosIp2String(uint32_t ip, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str); +void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); + static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; tMD5Init(&context); @@ -59,10 +61,10 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar tMD5Final(&context); char buf[TSDB_PASSWORD_LEN + 1]; - sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], - context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], - context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], + context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); memcpy(target, buf, TSDB_PASSWORD_LEN); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cab4805b20..d10935c022 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -157,7 +157,7 @@ typedef struct { static STqMgmt tqMgmt = {0}; // tqRead -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum); +int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); @@ -178,6 +178,7 @@ STqOffsetStore* tqOffsetOpen(); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); +int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetSnapshot(STqOffsetStore* pStore); // tqSink diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8ef66d3ef6..35486a5267 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -271,8 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); if (pOffset != NULL) { fetchOffsetNew = pOffset->val; - char formatBuf[50]; - tFormatOffset(formatBuf, 50, &fetchOffsetNew); + char formatBuf[80]; + tFormatOffset(formatBuf, 80, &fetchOffsetNew); tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { @@ -302,9 +302,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = fetchOffsetNew.version + 1; - SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); - if (pHeadWithCkSum == NULL) { + int64_t fetchVer = fetchOffsetNew.version + 1; + SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { return -1; } @@ -318,7 +318,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { // TODO add push mgr tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); @@ -329,7 +329,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { goto OVER; } - SWalReadHead* pHead = &pHeadWithCkSum->head; + SWalCont* pHead = &pCkHead->head; tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); @@ -373,9 +373,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } } - taosMemoryFree(pHeadWithCkSum); + taosMemoryFree(pCkHead); } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); + tqInfo("retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld", dataRsp.reqOffset.uid, + dataRsp.reqOffset.ts, fetchOffsetNew.uid, fetchOffsetNew.ts); if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { ASSERT(0); } @@ -522,7 +523,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { break; } - SWalReadHead* pHead = &pHeadWithCkSum->head; + SWalCont* pHead = &pHeadWithCkSum->head; tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); @@ -597,6 +598,8 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); + tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 16b8872098..ef61897f91 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -97,6 +97,10 @@ int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); } +int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { + return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey)); +} + int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { // open file // TODO file name should be with a version diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7d7b9636df..9e5c67fed4 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,13 +15,13 @@ #include "tq.h" -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) { +int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { - if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) { + if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); *fetchOffset = offset - 1; @@ -29,8 +29,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* goto END; } - if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) { - code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); + if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { + code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -43,9 +43,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* goto END; } else { if (pHandle->fetchMeta) { - SWalReadHead* pHead = &((*ppHeadWithCkSum)->head); + SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { - code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); + code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -58,7 +58,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* goto END; } } - code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum); + code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); if (code < 0) { ASSERT(0); *fetchOffset = offset; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9872f26b03..d1d2c55acf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -287,6 +287,7 @@ typedef struct STableScanInfo { } lastStatus; int8_t scanMode; + int8_t noTable; } STableScanInfo; typedef struct STagScanInfo { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 8dd8005225..2d5ccf8568 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -235,9 +235,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (uid == 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } } return doPrepareScan(pTaskInfo->pRoot, uid, ts); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 61bf73c69c..c7dd288617 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2829,21 +2829,28 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; + if (uid == 0) { + pInfo->noTable = 1; + return TSDB_CODE_SUCCESS; + } /*if (pSnapShotScanInfo->dataReader == NULL) {*/ /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ /*}*/ + pInfo->noTable = 0; + if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { tsdbSetTableId(pInfo->dataReader, uid); int64_t oldSkey = pInfo->cond.twindows[0].skey; - pInfo->cond.twindows[0].skey = ts; + pInfo->cond.twindows[0].skey = ts + 1; tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); pInfo->cond.twindows[0].skey = oldSkey; pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; } + return TSDB_CODE_SUCCESS; } else { if (pOperator->numOfDownstream == 1) { @@ -2856,8 +2863,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { return TSDB_CODE_QRY_APP_ERROR; } } - - return TSDB_CODE_SUCCESS; } int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 504618d8a4..89a47f4e2c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -518,6 +518,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { + if (pInfo->noTable) return NULL; while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e9b311db9a..529615d4fd 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -88,3 +88,12 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } + +void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { + int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); + ASSERT(ref >= 0); + if (ref == 0) { + taosMemoryFree(pDataSubmit->data); + taosMemoryFree(pDataSubmit->dataRef); + } +} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b3be35fba3..d178c19615 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -63,7 +63,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) continue; } - // TODO: do we need free memory? SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d5a4da60f5..f82ef1b42f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -41,12 +41,3 @@ void streamQueueClose(SStreamQueue* queue) { return; } } - -void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { - int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - taosMemoryFree(pDataSubmit->data); - taosMemoryFree(pDataSubmit->dataRef); - } -} diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 7ca105ff2b..c23d0802c1 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -27,7 +27,7 @@ extern "C" { #endif // meta section begin -typedef struct WalFileInfo { +typedef struct { int64_t firstVer; int64_t lastVer; int64_t createTs; @@ -98,20 +98,20 @@ static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) { return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); } -static inline int walValidHeadCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead); +static inline int walValidHeadCksum(SWalCkHead* pHead) { + return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalCont), pHead->cksumHead); } -static inline int walValidBodyCksum(SWalHead* pHead) { +static inline int walValidBodyCksum(SWalCkHead* pHead) { return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody); } -static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { +static inline int walValidCksum(SWalCkHead* pHead, void* body, int64_t bodyLen) { return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); } -static inline uint32_t walCalcHeadCksum(SWalHead* pHead) { - return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead)); +static inline uint32_t walCalcHeadCksum(SWalCkHead* pHead) { + return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalCont)); } static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 342ab7b152..313fd06c8e 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -16,7 +16,7 @@ #include "cJSON.h" #include "os.h" #include "taoserror.h" -#include "tref.h" +#include "tutil.h" #include "walInt.h" bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { @@ -37,26 +37,9 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } -void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { - char* limit; - - if (nlen == 0 || hlen < nlen) { - return NULL; - } - - limit = haystack + hlen - nlen + 1; - while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) { - if (memcmp(haystack, needle, nlen) == 0) { - return haystack; - } - haystack++; - } - return NULL; -} - static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ASSERT(pWal->fileInfoSet != NULL); - int sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); ASSERT(sz > 0); #if 0 for (int i = 0; i < sz; i++) { @@ -101,14 +84,14 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { char* candidate; while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate - SWalHead* logContent = (SWalHead*)candidate; + SWalCkHead* logContent = (SWalCkHead*)candidate; if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { found = candidate; } haystack = candidate + 1; } if (found == buf) { - SWalHead* logContent = (SWalHead*)found; + SWalCkHead* logContent = (SWalCkHead*)found; if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { // file has to be deleted taosMemoryFree(buf); @@ -118,7 +101,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { } } taosCloseFile(&pFile); - SWalHead* lastEntry = (SWalHead*)found; + SWalCkHead* lastEntry = (SWalCkHead*)found; return lastEntry->head.version; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 9505b02806..491b982968 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -117,8 +117,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->lastRollSeq = -1; // init write buffer - memset(&pWal->writeHead, 0, sizeof(SWalHead)); - pWal->writeHead.head.headVer = WAL_HEAD_VER; + memset(&pWal->writeHead, 0, sizeof(SWalCkHead)); + pWal->writeHead.head.protoVer = WAL_PROTO_VER; pWal->writeHead.magic = WAL_MAGIC; if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 2de0fea9ac..e7f0b31ccc 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -33,7 +33,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { taosThreadMutexInit(&pRead->mutex, NULL); - pRead->pHead = taosMemoryMalloc(sizeof(SWalHead)); + pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); if (pRead->pHead == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; taosMemoryFree(pRead); @@ -155,7 +155,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } -int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) { int64_t code; // TODO: valid ver @@ -170,8 +170,8 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { ASSERT(taosValidFile(pRead->pReadLogTFile) == true); - code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); - if (code != sizeof(SWalHead)) { + code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead)); + if (code != sizeof(SWalCkHead)) { return -1; } @@ -186,7 +186,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { return 0; } -int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) { int64_t code; ASSERT(pRead->curVersion == pHead->head.version); @@ -203,12 +203,12 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { return 0; } -int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { - SWalReadHead *pReadHead = &((*ppHead)->head); - int64_t ver = pReadHead->version; +int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) { + SWalCont *pReadHead = &((*ppHead)->head); + int64_t ver = pReadHead->version; if (pRead->capacity < pReadHead->bodyLen) { - void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen); + void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; @@ -241,18 +241,18 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { return 0; } -int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) { +int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) { taosThreadMutexLock(&pRead->mutex); if (walReadWithHandle(pRead, ver) < 0) { taosThreadMutexUnlock(&pRead->mutex); return -1; } - *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); + *ppHead = taosMemoryMalloc(sizeof(SWalCont) + pRead->pHead->head.bodyLen); if (*ppHead == NULL) { taosThreadMutexUnlock(&pRead->mutex); return -1; } - memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); + memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalCont) + pRead->pHead->head.bodyLen); taosThreadMutexUnlock(&pRead->mutex); return 0; } @@ -282,8 +282,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ASSERT(taosValidFile(pRead->pReadLogTFile) == true); - code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); - if (code != sizeof(SWalHead)) { + code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead)); + if (code != sizeof(SWalCkHead)) { if (code < 0) terrno = TAOS_SYSTEM_ERROR(errno); else { @@ -301,7 +301,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } if (pRead->capacity < pRead->pHead->head.bodyLen) { - void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); + void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d1295667af..9245c03826 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -142,10 +142,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return -1; } // validate offset - SWalHead head; + SWalCkHead head; ASSERT(taosValidFile(pLogTFile)); - int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalHead)); - if (size != sizeof(SWalHead)) { + int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); + if (size != sizeof(SWalCkHead)) { return -1; } code = walValidHeadCksum(&head); @@ -261,7 +261,7 @@ int32_t walEndSnapshot(SWal *pWal) { } int walRoll(SWal *pWal) { - int code = 0; + int32_t code = 0; if (pWal->pWriteIdxTFile != NULL) { code = taosCloseFile(&pWal->pWriteIdxTFile); if (code != 0) { @@ -321,12 +321,13 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { return 0; } -int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, +int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, int32_t bodyLen) { - int code = 0; + int32_t code = 0; // no wal if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0; + if (bodyLen > TSDB_MAX_WAL_SIZE) { terrno = TSDB_CODE_WAL_SIZE_LIMIT; return -1; @@ -356,6 +357,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } + /*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/ ASSERT(pWal->writeCur >= 0); @@ -380,7 +382,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), @@ -405,19 +407,19 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog // set status if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index; pWal->vers.lastVer = index; - pWal->totSize += sizeof(SWalHead) + bodyLen; + pWal->totSize += sizeof(SWalCkHead) + bodyLen; if (walGetCurFileInfo(pWal)->firstVer == -1) { walGetCurFileInfo(pWal)->firstVer = index; } walGetCurFileInfo(pWal)->lastVer = index; - walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; + walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen; taosThreadMutexUnlock(&pWal->mutex); return 0; } -int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { +int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { SSyncLogMeta syncMeta = { .isWeek = -1, .seqNum = UINT64_MAX, @@ -435,27 +437,3 @@ void walFsync(SWal *pWal, bool forceFsync) { } } } - -/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/ -/*int code = 0;*/ -/*SWalHead *pHead = NULL;*/ -/*code = (int)walRead(pWal, &pHead, ver);*/ -/*if(pHead->head.version != ver) {*/ -/*return -1;*/ -/*}*/ -/*return 0;*/ -/*}*/ - -/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/ -/*int code = walSeekVer(pWal, ver);*/ -/*if(code != 0) {*/ -/*return -1;*/ -/*}*/ - -/*code = walValidateOffset(pWal, ver);*/ -/*if(code != 0) {*/ -/*return -1;*/ -/*}*/ - -/*return 0;*/ -/*}*/ diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index b1c673e87b..89c4fd9ef2 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -148,7 +148,7 @@ TEST_F(WalCleanEnv, createNew) { walRollFileInfo(pWal); ASSERT(pWal->fileInfoSet != NULL); ASSERT_EQ(pWal->fileInfoSet->size, 1); - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); ASSERT_EQ(pInfo->firstVer, 0); ASSERT_EQ(pInfo->lastVer, -1); ASSERT_EQ(pInfo->closeTs, -1); diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 291d38bdca..65471df0a9 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -300,16 +300,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { return pFile; } -int64_t taosCloseFile(TdFilePtr *ppFile) { +int32_t taosCloseFile(TdFilePtr *ppFile) { + int32_t code = 0; if (ppFile == NULL || *ppFile == NULL) { return 0; } #if FILE_WITH_LOCK taosThreadRwlockWrlock(&((*ppFile)->rwlock)); #endif - if (ppFile == NULL || *ppFile == NULL) { - return 0; - } if ((*ppFile)->fp != NULL) { fflush((*ppFile)->fp); fclose((*ppFile)->fp); @@ -320,9 +318,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd); !FlushFileBuffers(h); #else - fsync((*ppFile)->fd); + // warning: never fsync silently in base lib + /*fsync((*ppFile)->fd);*/ #endif - close((*ppFile)->fd); + code = close((*ppFile)->fd); (*ppFile)->fd = -1; } (*ppFile)->refId = 0; @@ -332,7 +331,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { #endif taosMemoryFree(*ppFile); *ppFile = NULL; - return 0; + return code; } int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { @@ -560,6 +559,8 @@ int32_t taosFsyncFile(TdFilePtr pFile) { return 0; } + // this implementation is WRONG + // fflush is not a replacement of fsync if (pFile->fp != NULL) return fflush(pFile->fp); if (pFile->fd >= 0) { #ifdef WINDOWS diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 23e79da948..d16d0e610a 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -206,11 +206,11 @@ void* taosArrayGetP(const SArray* pArray, size_t index) { void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } -size_t taosArrayGetSize(const SArray* pArray) { +int32_t taosArrayGetSize(const SArray* pArray) { if (pArray == NULL) { return 0; } - return pArray->size; + return (int32_t)pArray->size; } void taosArraySetSize(SArray* pArray, size_t size) { diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 0534eb3462..addb9f55ba 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -16,6 +16,23 @@ #define _DEFAULT_SOURCE #include "tutil.h" +void *tmemmem(const char *haystack, int32_t hlen, const char *needle, int32_t nlen) { + const char *limit; + + if (nlen == 0 || hlen < nlen) { + return NULL; + } + + limit = haystack + hlen - nlen + 1; + while ((haystack = (char *)memchr(haystack, needle[0], limit - haystack)) != NULL) { + if (memcmp(haystack, needle, nlen) == 0) { + return (void *)haystack; + } + haystack++; + } + return NULL; +} + int32_t strdequote(char *z) { if (z == NULL) { return 0;