add serial

This commit is contained in:
yihaoDeng 2024-10-08 19:44:56 +08:00 committed by Jing Sima
parent 11ed8a1540
commit ecfa67510c
2 changed files with 132 additions and 22 deletions

View File

@ -3289,13 +3289,31 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
char* tVal;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code;
}
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
@ -3541,14 +3559,31 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
int code = 0;
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
return 0;
int code = 0;
char* tVal = NULL;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int code = 0;
@ -3563,7 +3598,15 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
if (value == NULL || vLen == 0) {
stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
}
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
@ -3861,13 +3904,30 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
char* tVal;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
@ -4204,21 +4264,44 @@ _end:
#ifdef BUILD_NO_CALL
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
int code = 0;
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);
int code = 0;
char* tVal;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
taosMemoryFree(tVal);
return code;
}
#endif
// parname cfg
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
int code = 0;
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
@ -4229,13 +4312,30 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
int code = 0;
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size);
taosMemoryFree(dst);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
int code = 0;
char* tVal;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
@ -4377,10 +4477,18 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
char buf[128] = {0};
char buf[128] = {0};
char* dst = NULL;
size_t size = 0;
int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
if (code != 0) {
return code;
}
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
@ -4389,6 +4497,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(dst);
if (tmpBuf == NULL) {
taosMemoryFree(ttlV);
}

View File

@ -698,7 +698,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
char* buf = taosMemoryCalloc(1, len);
if (!buf) {
code = terrno;