contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3.1 vs contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3.2

- old
+ new

@@ -18,12 +18,11 @@ /* ====== Constants ====== */ #define ZSTDMT_OVERLAPLOG_DEFAULT 0 /* ====== Dependencies ====== */ -#include <string.h> /* memcpy, memset */ -#include <limits.h> /* INT_MAX, UINT_MAX */ +#include "../common/zstd_deps.h" /* ZSTD_memcpy, ZSTD_memset, INT_MAX, UINT_MAX */ #include "../common/mem.h" /* MEM_STATIC */ #include "../common/pool.h" /* threadpool */ #include "../common/threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstd_ldm.h" @@ -104,15 +103,15 @@ } ZSTDMT_bufferPool; static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem) { unsigned const maxNbBuffers = 2*nbWorkers + 3; - ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( + ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc( sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); if (bufPool==NULL) return NULL; if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) { - ZSTD_free(bufPool, cMem); + ZSTD_customFree(bufPool, cMem); return NULL; } bufPool->bufferSize = 64 KB; bufPool->totalBuffers = maxNbBuffers; bufPool->nbBuffers = 0; @@ -125,14 +124,14 @@ unsigned u; DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32)(size_t)bufPool); if (!bufPool) return; /* compatibility with free on NULL */ for (u=0; u<bufPool->totalBuffers; u++) { DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->bTable[u].start); - ZSTD_free(bufPool->bTable[u].start, bufPool->cMem); + ZSTD_customFree(bufPool->bTable[u].start, bufPool->cMem); } ZSTD_pthread_mutex_destroy(&bufPool->poolMutex); - ZSTD_free(bufPool, bufPool->cMem); + ZSTD_customFree(bufPool, bufPool->cMem); } /* only works at initialization, not during compression */ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) { @@ -199,17 +198,17 @@ ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return buf; } /* size conditions not respected : scratch this buffer, create new one */ DEBUGLOG(5, "ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing"); - ZSTD_free(buf.start, bufPool->cMem); + ZSTD_customFree(buf.start, bufPool->cMem); } ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); /* create new buffer */ DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer"); { buffer_t buffer; - void* const start = ZSTD_malloc(bSize, bufPool->cMem); + void* const start = ZSTD_customMalloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.capacity = (start==NULL) ? 0 : bSize; if (start==NULL) { DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!"); } else { @@ -227,17 +226,17 @@ */ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) { size_t const bSize = bufPool->bufferSize; if (buffer.capacity < bSize) { - void* const start = ZSTD_malloc(bSize, bufPool->cMem); + void* const start = ZSTD_customMalloc(bSize, bufPool->cMem); buffer_t newBuffer; newBuffer.start = start; newBuffer.capacity = start == NULL ? 0 : bSize; if (start != NULL) { assert(newBuffer.capacity >= buffer.capacity); - memcpy(newBuffer.start, buffer.start, buffer.capacity); + ZSTD_memcpy(newBuffer.start, buffer.start, buffer.capacity); DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize); return newBuffer; } DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!"); } @@ -259,28 +258,26 @@ return; } ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); /* Reached bufferPool capacity (should not happen) */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing "); - ZSTD_free(buf.start, bufPool->cMem); + ZSTD_customFree(buf.start, bufPool->cMem); } /* ===== Seq Pool Wrapper ====== */ -static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0}; - typedef ZSTDMT_bufferPool ZSTDMT_seqPool; static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool) { return ZSTDMT_sizeof_bufferPool(seqPool); } static rawSeqStore_t bufferToSeq(buffer_t buffer) { - rawSeqStore_t seq = {NULL, 0, 0, 0}; + rawSeqStore_t seq = kNullRawSeqStore; seq.seq = (rawSeq*)buffer.start; seq.capacity = buffer.capacity / sizeof(rawSeq); return seq; } @@ -352,24 +349,24 @@ { int cid; for (cid=0; cid<pool->totalCCtx; cid++) ZSTD_freeCCtx(pool->cctx[cid]); /* note : compatible with free on NULL */ ZSTD_pthread_mutex_destroy(&pool->poolMutex); - ZSTD_free(pool, pool->cMem); + ZSTD_customFree(pool, pool->cMem); } /* ZSTDMT_createCCtxPool() : * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers, ZSTD_customMem cMem) { - ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( + ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_customCalloc( sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem); assert(nbWorkers > 0); if (!cctxPool) return NULL; if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { - ZSTD_free(cctxPool, cMem); + ZSTD_customFree(cctxPool, cMem); return NULL; } cctxPool->cMem = cMem; cctxPool->totalCCtx = nbWorkers; cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ @@ -476,11 +473,11 @@ assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashRateLog < 32); serialState->ldmState.hashPower = ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength); } else { - memset(&params.ldmParams, 0, sizeof(params.ldmParams)); + ZSTD_memset(&params.ldmParams, 0, sizeof(params.ldmParams)); } serialState->nextJobID = 0; if (params.fParams.checksumFlag) XXH64_reset(&serialState->xxhState, 0); if (params.ldmParams.enableLdm) { @@ -497,22 +494,22 @@ ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); /* Reset the window */ ZSTD_window_init(&serialState->ldmState.window); /* Resize tables and output space if necessary. */ if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { - ZSTD_free(serialState->ldmState.hashTable, cMem); - serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); + ZSTD_customFree(serialState->ldmState.hashTable, cMem); + serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_customMalloc(hashSize, cMem); } if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) { - ZSTD_free(serialState->ldmState.bucketOffsets, cMem); - serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem); + ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); + serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_customMalloc(bucketSize, cMem); } if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets) return 1; /* Zero the tables */ - memset(serialState->ldmState.hashTable, 0, hashSize); - memset(serialState->ldmState.bucketOffsets, 0, bucketSize); + ZSTD_memset(serialState->ldmState.hashTable, 0, hashSize); + ZSTD_memset(serialState->ldmState.bucketOffsets, 0, bucketSize); /* Update window state and fill hash table with dict */ serialState->ldmState.loadedDictEnd = 0; if (dictSize > 0) { if (dictContentType == ZSTD_dct_rawContent) { @@ -535,11 +532,11 @@ } static int ZSTDMT_serialState_init(serialState_t* serialState) { int initError = 0; - memset(serialState, 0, sizeof(*serialState)); + ZSTD_memset(serialState, 0, sizeof(*serialState)); initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL); initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL); initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL); initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL); return initError; @@ -550,12 +547,12 @@ ZSTD_customMem cMem = serialState->params.customMem; ZSTD_pthread_mutex_destroy(&serialState->mutex); ZSTD_pthread_cond_destroy(&serialState->cond); ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex); ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond); - ZSTD_free(serialState->ldmState.hashTable, cMem); - ZSTD_free(serialState->ldmState.bucketOffsets, cMem); + ZSTD_customFree(serialState->ldmState.hashTable, cMem); + ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); } static void ZSTDMT_serialState_update(serialState_t* serialState, ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore, range_t src, unsigned jobID) @@ -818,11 +815,10 @@ int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */ inBuff_t inBuff; roundBuff_t roundBuff; serialState_t serial; rsyncState_t rsync; - unsigned singleBlockingThread; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; unsigned frameEnded; unsigned allJobsCompleted; @@ -830,21 +826,22 @@ unsigned long long consumed; unsigned long long produced; ZSTD_customMem cMem; ZSTD_CDict* cdictLocal; const ZSTD_CDict* cdict; + unsigned providedFactory: 1; }; static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem) { U32 jobNb; if (jobTable == NULL) return; for (jobNb=0; jobNb<nbJobs; jobNb++) { ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex); ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond); } - ZSTD_free(jobTable, cMem); + ZSTD_customFree(jobTable, cMem); } /* ZSTDMT_allocJobsTable() * allocate and init a job table. * update *nbJobsPtr to next power of 2 value, as size of table */ @@ -852,11 +849,11 @@ { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; U32 const nbJobs = 1 << nbJobsLog2; U32 jobNb; ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*) - ZSTD_calloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem); + ZSTD_customCalloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem); int initError = 0; if (jobTable==NULL) return NULL; *nbJobsPtr = nbJobs; for (jobNb=0; jobNb<nbJobs; jobNb++) { initError |= ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL); @@ -883,16 +880,16 @@ } /* ZSTDMT_CCtxParam_setNbWorkers(): * Internal use only */ -size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) +static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) { return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers); } -MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem) +MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool) { ZSTDMT_CCtx* mtctx; U32 nbJobs = nbWorkers + 2; int initError; DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers); @@ -901,16 +898,23 @@ nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX); if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) /* invalid custom allocator */ return NULL; - mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); + mtctx = (ZSTDMT_CCtx*) ZSTD_customCalloc(sizeof(ZSTDMT_CCtx), cMem); if (!mtctx) return NULL; ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); mtctx->cMem = cMem; mtctx->allJobsCompleted = 1; - mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); + if (pool != NULL) { + mtctx->factory = pool; + mtctx->providedFactory = 1; + } + else { + mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); + mtctx->providedFactory = 0; + } mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); @@ -923,27 +927,23 @@ } DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers); return mtctx; } -ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) +ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool) { #ifdef ZSTD_MULTITHREAD - return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem); + return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem, pool); #else (void)nbWorkers; (void)cMem; + (void)pool; return NULL; #endif } -ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers) -{ - return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem); -} - /* ZSTDMT_releaseAllJobResources() : * note : ensure all workers are killed first ! */ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) { unsigned jobID; @@ -955,11 +955,11 @@ DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); /* Clear the job description, but keep the mutex/cond */ - memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); + ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); mtctx->jobs[jobID].job_mutex = mutex; mtctx->jobs[jobID].job_cond = cond; } mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.filled = 0; @@ -982,21 +982,22 @@ } size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) { if (mtctx==NULL) return 0; /* compatible with free on NULL */ - POOL_free(mtctx->factory); /* stop and free worker threads */ + if (!mtctx->providedFactory) + POOL_free(mtctx->factory); /* stop and free worker threads */ ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTDMT_freeSeqPool(mtctx->seqPool); ZSTDMT_serialState_free(&mtctx->serial); ZSTD_freeCDict(mtctx->cdictLocal); if (mtctx->roundBuff.buffer) - ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); - ZSTD_free(mtctx, mtctx->cMem); + ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); + ZSTD_customFree(mtctx, mtctx->cMem); return 0; } size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) { @@ -1009,70 +1010,11 @@ + ZSTDMT_sizeof_seqPool(mtctx->seqPool) + ZSTD_sizeof_CDict(mtctx->cdictLocal) + mtctx->roundBuff.capacity; } -/* Internal only */ -size_t -ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, - ZSTDMT_parameter parameter, - int value) -{ - DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter"); - switch(parameter) - { - case ZSTDMT_p_jobSize : - DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value); - case ZSTDMT_p_overlapLog : - DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value); - case ZSTDMT_p_rsyncable : - DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value); - default : - return ERROR(parameter_unsupported); - } -} -size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value) -{ - DEBUGLOG(4, "ZSTDMT_setMTCtxParameter"); - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); -} - -size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value) -{ - switch (parameter) { - case ZSTDMT_p_jobSize: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value); - case ZSTDMT_p_overlapLog: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value); - case ZSTDMT_p_rsyncable: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value); - default: - return ERROR(parameter_unsupported); - } -} - -/* Sets parameters relevant to the compression job, - * initializing others to default values. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params) -{ - ZSTD_CCtx_params jobParams = *params; - /* Clear parameters related to multithreading */ - jobParams.forceWindow = 0; - jobParams.nbWorkers = 0; - jobParams.jobSize = 0; - jobParams.overlapLog = 0; - jobParams.rsyncable = 0; - memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t)); - memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem)); - return jobParams; -} - - /* ZSTDMT_resize() : * @return : error code if fails, 0 on success */ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) { if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); @@ -1096,11 +1038,11 @@ U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ int const compressionLevel = cctxParams->compressionLevel; DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)", compressionLevel); mtctx->params.compressionLevel = compressionLevel; - { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0); + { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0, ZSTD_cpm_noAttachDict); cParams.windowLog = saved_wlog; mtctx->params.cParams = cParams; } } @@ -1183,12 +1125,12 @@ { unsigned jobLog; if (params->ldmParams.enableLdm) { /* In Long Range Mode, the windowLog is typically oversized. * In which case, it's preferable to determine the jobSize - * based on chainLog instead. */ - jobLog = MAX(21, params->cParams.chainLog + 4); + * based on cycleLog instead. */ + jobLog = MAX(21, ZSTD_cycleLog(params->cParams.chainLog, params->cParams.strategy) + 3); } else { jobLog = MAX(20, params->cParams.windowLog + 2); } return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX); } @@ -1238,178 +1180,10 @@ DEBUGLOG(4, "overlapLog : %i", params->overlapLog); DEBUGLOG(4, "overlap size : %i", 1 << ovLog); return (ovLog==0) ? 0 : (size_t)1 << ovLog; } -static unsigned -ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers) -{ - assert(nbWorkers>0); - { size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params); - size_t const jobMaxSize = jobSizeTarget << 2; - size_t const passSizeMax = jobMaxSize * nbWorkers; - unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbJobsLarge = multiplier * nbWorkers; - unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; - unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers); - return (multiplier>1) ? nbJobsLarge : nbJobsSmall; -} } - -/* ZSTDMT_compress_advanced_internal() : - * This is a blocking function : it will only give back control to caller after finishing its compression job. - */ -static size_t -ZSTDMT_compress_advanced_internal( - ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_CCtx_params params) -{ - ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(&params); - size_t const overlapSize = ZSTDMT_computeOverlapSize(&params); - unsigned const nbJobs = ZSTDMT_computeNbJobs(&params, srcSize, params.nbWorkers); - size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; - size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ - const char* const srcStart = (const char*)src; - size_t remainingSrcSize = srcSize; - unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ - size_t frameStartPos = 0, dstBufferPos = 0; - assert(jobParams.nbWorkers == 0); - assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); - - params.jobSize = (U32)avgJobSize; - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", - nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - - if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ - ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode"); - if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); - return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams); - } - - assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ - ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - /* LDM doesn't even try to load the dictionary in single-ingestion mode */ - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto)) - return ERROR(memory_allocation); - - FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , ""); /* only expands if necessary */ - - { unsigned u; - for (u=0; u<nbJobs; u++) { - size_t const jobSize = MIN(remainingSrcSize, avgJobSize); - size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); - buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; - buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; - size_t dictSize = u ? overlapSize : 0; - - mtctx->jobs[u].prefix.start = srcStart + frameStartPos - dictSize; - mtctx->jobs[u].prefix.size = dictSize; - mtctx->jobs[u].src.start = srcStart + frameStartPos; - mtctx->jobs[u].src.size = jobSize; assert(jobSize > 0); /* avoid job.src.size == 0 */ - mtctx->jobs[u].consumed = 0; - mtctx->jobs[u].cSize = 0; - mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; - mtctx->jobs[u].fullFrameSize = srcSize; - mtctx->jobs[u].params = jobParams; - /* do not calculate checksum within sections, but write it in header for first section */ - mtctx->jobs[u].dstBuff = dstBuffer; - mtctx->jobs[u].cctxPool = mtctx->cctxPool; - mtctx->jobs[u].bufPool = mtctx->bufPool; - mtctx->jobs[u].seqPool = mtctx->seqPool; - mtctx->jobs[u].serial = &mtctx->serial; - mtctx->jobs[u].jobID = u; - mtctx->jobs[u].firstJob = (u==0); - mtctx->jobs[u].lastJob = (u==nbJobs-1); - - DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].prefix.start, 12); - POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); - - frameStartPos += jobSize; - dstBufferPos += dstBufferCapacity; - remainingSrcSize -= jobSize; - } } - - /* collect result */ - { size_t error = 0, dstPos = 0; - unsigned jobID; - for (jobID=0; jobID<nbJobs; jobID++) { - DEBUGLOG(5, "waiting for job %u ", jobID); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); - while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); - ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); - } - ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); - DEBUGLOG(5, "ready to write job %u ", jobID); - - { size_t const cSize = mtctx->jobs[jobID].cSize; - if (ZSTD_isError(cSize)) error = cSize; - if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); - if (jobID) { /* note : job 0 is written directly at dst, which is correct position */ - if (!error) - memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */ - if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */ - DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); - } } - mtctx->jobs[jobID].dstBuff = g_nullBuffer; - mtctx->jobs[jobID].cSize = 0; - dstPos += cSize ; - } - } /* for (jobID=0; jobID<nbJobs; jobID++) */ - - DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag); - if (params.fParams.checksumFlag) { - U32 const checksum = (U32)XXH64_digest(&mtctx->serial.xxhState); - if (dstPos + 4 > dstCapacity) { - error = ERROR(dstSize_tooSmall); - } else { - DEBUGLOG(4, "writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)dst + dstPos, checksum); - dstPos += 4; - } } - - if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); - return error ? error : dstPos; - } -} - -size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_parameters params, - int overlapLog) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - assert(ZSTD_OVERLAPLOG_MIN <= overlapLog && overlapLog <= ZSTD_OVERLAPLOG_MAX); - cctxParams.overlapLog = overlapLog; - return ZSTDMT_compress_advanced_internal(mtctx, - dst, dstCapacity, - src, srcSize, - cdict, cctxParams); -} - - -size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - int compressionLevel) -{ - ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); - int const overlapLog = ZSTDMT_overlapLog_default(params.cParams.strategy); - params.fParams.contentSizeFlag = 1; - return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog); -} - - /* ====================================== */ /* ======= Streaming API ======= */ /* ====================================== */ size_t ZSTDMT_initCStream_internal( @@ -1430,20 +1204,10 @@ FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , ""); if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX; - mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ - if (mtctx->singleBlockingThread) { - ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(&params); - DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); - assert(singleThreadParams.nbWorkers == 0); - return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], - dict, dictSize, cdict, - &singleThreadParams, pledgedSrcSize); - } - DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_releaseAllJobResources(mtctx); @@ -1502,12 +1266,12 @@ size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { if (mtctx->roundBuff.buffer) - ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); - mtctx->roundBuff.buffer = (BYTE*)ZSTD_malloc(capacity, mtctx->cMem); + ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); + mtctx->roundBuff.buffer = (BYTE*)ZSTD_customMalloc(capacity, mtctx->cMem); if (mtctx->roundBuff.buffer == NULL) { mtctx->roundBuff.capacity = 0; return ERROR(memory_allocation); } mtctx->roundBuff.capacity = capacity; @@ -1528,58 +1292,11 @@ dict, dictSize, dictContentType)) return ERROR(memory_allocation); return 0; } -size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, - ZSTD_parameters params, - unsigned long long pledgedSrcSize) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ - DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize); - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dct_auto, NULL, - cctxParams, pledgedSrcSize); -} -size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, - const ZSTD_CDict* cdict, - ZSTD_frameParameters fParams, - unsigned long long pledgedSrcSize) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; - if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ - cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict); - cctxParams.fParams = fParams; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dct_auto, cdict, - cctxParams, pledgedSrcSize); -} - - -/* ZSTDMT_resetCStream() : - * pledgedSrcSize can be zero == unknown (for the time being) - * prefer using ZSTD_CONTENTSIZE_UNKNOWN, - * as `0` might mean "empty" in the future */ -size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) -{ - if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, 0, mtctx->params, - pledgedSrcSize); -} - -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { - ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); - ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ - DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); -} - - /* ZSTDMT_writeLastEmptyBlock() * Write a single empty block with an end-of-frame to finish a frame. * Job must be created from streaming variant. * This function is always successful if expected conditions are fulfilled. */ @@ -1738,11 +1455,11 @@ (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); assert(mtctx->jobs[wJobID].dstBuff.start != NULL); if (toFlush > 0) { - memcpy((char*)output->dst + output->pos, + ZSTD_memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toFlush); } output->pos += toFlush; mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ @@ -1892,11 +1609,11 @@ if (ZSTDMT_isOverlapped(buffer, inUse)) { DEBUGLOG(5, "Waiting for buffer..."); return 0; } ZSTDMT_waitForLdmComplete(mtctx, buffer); - memmove(start, mtctx->inBuff.prefix.start, prefixSize); + ZSTD_memmove(start, mtctx->inBuff.prefix.start, prefixSize); mtctx->inBuff.prefix.start = start; mtctx->roundBuff.pos = prefixSize; } buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; buffer.capacity = target; @@ -1966,10 +1683,20 @@ * Start scanning at the beginning of the input. */ pos = 0; prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH; hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH); + if ((hash & hitMask) == hitMask) { + /* We're already at a sync point so don't load any more until + * we're able to flush this sync point. + * This likely happened because the job table was full so we + * couldn't add our job. + */ + syncPoint.toLoad = 0; + syncPoint.flush = 1; + return syncPoint; + } } else { /* We don't have enough bytes buffered to initialize the hash, but * we know we have at least RSYNC_LENGTH bytes total. * Start scanning after the first RSYNC_LENGTH bytes less the bytes * already buffered. @@ -2020,38 +1747,15 @@ DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", (U32)endOp, (U32)(input->size - input->pos)); assert(output->pos <= output->size); assert(input->pos <= input->size); - if (mtctx->singleBlockingThread) { /* delegate to single-thread (synchronous) */ - return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp); - } - if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { /* current frame being ended. Only flush/end are allowed */ return ERROR(stage_wrong); } - /* single-pass shortcut (note : synchronous-mode) */ - if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */ - && (mtctx->nextJobID == 0) /* just started */ - && (mtctx->inBuff.filled == 0) /* nothing buffered */ - && (!mtctx->jobReady) /* no job already created */ - && (endOp == ZSTD_e_end) /* end order */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ - size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, - (char*)output->dst + output->pos, output->size - output->pos, - (const char*)input->src + input->pos, input->size - input->pos, - mtctx->cdict, mtctx->params); - if (ZSTD_isError(cSize)) return cSize; - input->pos = input->size; - output->pos += cSize; - mtctx->allJobsCompleted = 1; - mtctx->frameEnded = 1; - return 0; - } - /* fill input buffer */ if ( (!mtctx->jobReady) && (input->size > input->pos) ) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ @@ -2070,18 +1774,26 @@ endOp = ZSTD_e_flush; } assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); - memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); + ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); input->pos += syncPoint.toLoad; mtctx->inBuff.filled += syncPoint.toLoad; forwardInputProgress = syncPoint.toLoad>0; } - if ((input->pos < input->size) && (endOp == ZSTD_e_end)) - endOp = ZSTD_e_flush; /* can't end now : not all input consumed */ } + if ((input->pos < input->size) && (endOp == ZSTD_e_end)) { + /* Can't end yet because the input is not fully consumed. + * We are in one of these cases: + * - mtctx->inBuff is NULL & empty: we couldn't get an input buffer so don't create a new job. + * - We filled the input buffer: flush this job but don't end the frame. + * - We hit a synchronization point: flush this job but don't end the frame. + */ + assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable); + endOp = ZSTD_e_flush; + } if ( (mtctx->jobReady) || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ @@ -2094,50 +1806,6 @@ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush); return remainingToFlush; } -} - - -size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) -{ - FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , ""); - - /* recommended next input size : fill current input buffer */ - return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ -} - - -static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) -{ - size_t const srcSize = mtctx->inBuff.filled; - DEBUGLOG(5, "ZSTDMT_flushStream_internal"); - - if ( mtctx->jobReady /* one job ready for a worker to pick up */ - || (srcSize > 0) /* still some data within input buffer */ - || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ - DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", - (U32)srcSize, (U32)endFrame); - FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , ""); - } - - /* check if there is any data available to flush */ - return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); -} - - -size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ - DEBUGLOG(5, "ZSTDMT_flushStream"); - if (mtctx->singleBlockingThread) - return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); -} - -size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ - DEBUGLOG(4, "ZSTDMT_endStream"); - if (mtctx->singleBlockingThread) - return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); }