contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3 vs contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3.1

- old
+ new

@@ -1,7 +1,7 @@ /* - * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. + * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the * LICENSE file in the root directory of this source tree) and the GPLv2 (found * in the COPYING file in the root directory of this source tree). @@ -20,13 +20,13 @@ /* ====== Dependencies ====== */ #include <string.h> /* memcpy, memset */ #include <limits.h> /* INT_MAX, UINT_MAX */ -#include "mem.h" /* MEM_STATIC */ -#include "pool.h" /* threadpool */ -#include "threading.h" /* mutex */ +#include "../common/mem.h" /* MEM_STATIC */ +#include "../common/pool.h" /* threadpool */ +#include "../common/threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstd_ldm.h" #include "zstdmt_compress.h" /* Guards code to support resizing the SeqPool. @@ -459,11 +459,17 @@ ZSTD_pthread_mutex_t ldmWindowMutex; ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */ ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ } serialState_t; -static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize) +static int +ZSTDMT_serialState_reset(serialState_t* serialState, + ZSTDMT_seqPool* seqPool, + ZSTD_CCtx_params params, + size_t jobSize, + const void* dict, size_t const dictSize, + ZSTD_dictContentType_e dictContentType) { /* Adjust parameters */ if (params.ldmParams.enableLdm) { DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams); @@ -488,12 +494,11 @@ serialState->params.ldmParams.hashLog - serialState->params.ldmParams.bucketSizeLog; /* Size the seq pool tables */ ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); /* Reset the window */ - ZSTD_window_clear(&serialState->ldmState.window); - serialState->ldmWindow = serialState->ldmState.window; + ZSTD_window_init(&serialState->ldmState.window); /* Resize tables and output space if necessary. */ if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { ZSTD_free(serialState->ldmState.hashTable, cMem); serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); } @@ -504,11 +509,28 @@ if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets) return 1; /* Zero the tables */ memset(serialState->ldmState.hashTable, 0, hashSize); memset(serialState->ldmState.bucketOffsets, 0, bucketSize); + + /* Update window state and fill hash table with dict */ + serialState->ldmState.loadedDictEnd = 0; + if (dictSize > 0) { + if (dictContentType == ZSTD_dct_rawContent) { + BYTE const* const dictEnd = (const BYTE*)dict + dictSize; + ZSTD_window_update(&serialState->ldmState.window, dict, dictSize); + ZSTD_ldm_fillHashTable(&serialState->ldmState, (const BYTE*)dict, dictEnd, &params.ldmParams); + serialState->ldmState.loadedDictEnd = params.forceWindow ? 0 : (U32)(dictEnd - serialState->ldmState.window.base); + } else { + /* don't even load anything */ + } + } + + /* Initialize serialState's copy of ldmWindow. */ + serialState->ldmWindow = serialState->ldmState.window; } + serialState->params = params; serialState->params.jobSize = (U32)jobSize; return 0; } @@ -666,11 +688,11 @@ jobParams.ldmParams.enableLdm = 0; /* init */ if (job->cdict) { - size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ if (ZSTD_isError(initError)) JOB_ERROR(initError); } else { /* srcStart points at reloaded section */ U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; { size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob); @@ -678,11 +700,11 @@ } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ ZSTD_dtlm_fast, NULL, /*cdict*/ - jobParams, pledgedSrcSize); + &jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) JOB_ERROR(initError); } } /* Perform serial step as early as possible, but after CCtx initialization */ ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); @@ -925,16 +947,22 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) { unsigned jobID; DEBUGLOG(3, "ZSTDMT_releaseAllJobResources"); for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { + /* Copy the mutex/cond out */ + ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex; + ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond; + DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); - mtctx->jobs[jobID].dstBuff = g_nullBuffer; - mtctx->jobs[jobID].cSize = 0; + + /* Clear the job description, but keep the mutex/cond */ + memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); + mtctx->jobs[jobID].job_mutex = mutex; + mtctx->jobs[jobID].job_cond = cond; } - memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.filled = 0; mtctx->allJobsCompleted = 1; } @@ -1026,13 +1054,13 @@ } } /* Sets parameters relevant to the compression job, * initializing others to default values. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) +static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params) { - ZSTD_CCtx_params jobParams = params; + ZSTD_CCtx_params jobParams = *params; /* Clear parameters related to multithreading */ jobParams.forceWindow = 0; jobParams.nbWorkers = 0; jobParams.jobSize = 0; jobParams.overlapLog = 0; @@ -1046,11 +1074,11 @@ /* ZSTDMT_resize() : * @return : error code if fails, 0 on success */ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) { if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); - FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) ); + FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , ""); mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); if (mtctx->bufPool == NULL) return ERROR(memory_allocation); mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers); if (mtctx->cctxPool == NULL) return ERROR(memory_allocation); mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers); @@ -1068,11 +1096,11 @@ U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ int const compressionLevel = cctxParams->compressionLevel; DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)", compressionLevel); mtctx->params.compressionLevel = compressionLevel; - { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, 0, 0); + { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0); cParams.windowLog = saved_wlog; mtctx->params.cParams = cParams; } } @@ -1127,13 +1155,18 @@ ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; assert(flushed <= produced); + assert(jobPtr->consumed <= jobPtr->src.size); toFlush = produced - flushed; - if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) { - /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */ + /* if toFlush==0, nothing is available to flush. + * However, jobID is expected to still be active: + * if jobID was already completed and fully flushed, + * ZSTDMT_flushProduced() should have already moved onto next job. + * Therefore, some input has not yet been consumed. */ + if (toFlush==0) { assert(jobPtr->consumed < jobPtr->src.size); } } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } @@ -1144,18 +1177,22 @@ /* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -static unsigned ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params) +static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params) { - if (params.ldmParams.enableLdm) + unsigned jobLog; + if (params->ldmParams.enableLdm) { /* In Long Range Mode, the windowLog is typically oversized. * In which case, it's preferable to determine the jobSize * based on chainLog instead. */ - return MAX(21, params.cParams.chainLog + 4); - return MAX(20, params.cParams.windowLog + 2); + jobLog = MAX(21, params->cParams.chainLog + 4); + } else { + jobLog = MAX(20, params->cParams.windowLog + 2); + } + return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX); } static int ZSTDMT_overlapLog_default(ZSTD_strategy strat) { switch(strat) @@ -1182,31 +1219,31 @@ assert(0 <= ovlog && ovlog <= 9); if (ovlog == 0) return ZSTDMT_overlapLog_default(strat); return ovlog; } -static size_t ZSTDMT_computeOverlapSize(ZSTD_CCtx_params const params) +static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params) { - int const overlapRLog = 9 - ZSTDMT_overlapLog(params.overlapLog, params.cParams.strategy); - int ovLog = (overlapRLog >= 8) ? 0 : (params.cParams.windowLog - overlapRLog); + int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy); + int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog); assert(0 <= overlapRLog && overlapRLog <= 8); - if (params.ldmParams.enableLdm) { + if (params->ldmParams.enableLdm) { /* In Long Range Mode, the windowLog is typically oversized. * In which case, it's preferable to determine the jobSize * based on chainLog instead. * Then, ovLog becomes a fraction of the jobSize, rather than windowSize */ - ovLog = MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) + ovLog = MIN(params->cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog; } - assert(0 <= ovLog && ovLog <= 30); - DEBUGLOG(4, "overlapLog : %i", params.overlapLog); + assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX); + DEBUGLOG(4, "overlapLog : %i", params->overlapLog); DEBUGLOG(4, "overlap size : %i", 1 << ovLog); return (ovLog==0) ? 0 : (size_t)1 << ovLog; } static unsigned -ZSTDMT_computeNbJobs(ZSTD_CCtx_params params, size_t srcSize, unsigned nbWorkers) +ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers) { assert(nbWorkers>0); { size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params); size_t const jobMaxSize = jobSizeTarget << 2; size_t const passSizeMax = jobMaxSize * nbWorkers; @@ -1218,20 +1255,21 @@ } } /* ZSTDMT_compress_advanced_internal() : * This is a blocking function : it will only give back control to caller after finishing its compression job. */ -static size_t ZSTDMT_compress_advanced_internal( +static size_t +ZSTDMT_compress_advanced_internal( ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, ZSTD_CCtx_params params) { - ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); - size_t const overlapSize = ZSTDMT_computeOverlapSize(params); - unsigned const nbJobs = ZSTDMT_computeNbJobs(params, srcSize, params.nbWorkers); + ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(&params); + size_t const overlapSize = ZSTDMT_computeOverlapSize(&params); + unsigned const nbJobs = ZSTDMT_computeNbJobs(&params, srcSize, params.nbWorkers); size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; size_t remainingSrcSize = srcSize; unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ @@ -1245,19 +1283,20 @@ if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode"); if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); - return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); + return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams); } assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize)) + /* LDM doesn't even try to load the dictionary in single-ingestion mode */ + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto)) return ERROR(memory_allocation); - FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */ + FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , ""); /* only expands if necessary */ { unsigned u; for (u=0; u<nbJobs; u++) { size_t const jobSize = MIN(remainingSrcSize, avgJobSize); size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); @@ -1386,23 +1425,23 @@ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ /* init */ if (params.nbWorkers != mtctx->params.nbWorkers) - FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) ); + FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , ""); if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; - if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; + if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX; mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ if (mtctx->singleBlockingThread) { - ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); + ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(&params); DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); assert(singleThreadParams.nbWorkers == 0); return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], dict, dictSize, cdict, - singleThreadParams, pledgedSrcSize); + &singleThreadParams, pledgedSrcSize); } DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ @@ -1424,16 +1463,18 @@ ZSTD_freeCDict(mtctx->cdictLocal); mtctx->cdictLocal = NULL; mtctx->cdict = cdict; } - mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(params); + mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(&params); DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10)); mtctx->targetSectionSize = params.jobSize; if (mtctx->targetSectionSize == 0) { - mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); + mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(&params); } + assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX); + if (params.rsyncable) { /* Aim for the targetsectionSize as the average job size. */ U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20); U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20; assert(jobSizeMB >= 1); @@ -1481,11 +1522,12 @@ mtctx->nextJobID = 0; mtctx->frameEnded = 0; mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize)) + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize, + dict, dictSize, dictContentType)) return ERROR(memory_allocation); return 0; } size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, @@ -1695,13 +1737,15 @@ DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); assert(mtctx->jobs[wJobID].dstBuff.start != NULL); - memcpy((char*)output->dst + output->pos, - (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, - toFlush); + if (toFlush > 0) { + memcpy((char*)output->dst + output->pos, + (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, + toFlush); + } output->pos += toFlush; mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ if ( (srcConsumed == srcSize) /* job is completed */ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ @@ -1767,11 +1811,11 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) { BYTE const* const bufferStart = (BYTE const*)buffer.start; BYTE const* const bufferEnd = bufferStart + buffer.capacity; BYTE const* const rangeStart = (BYTE const*)range.start; - BYTE const* const rangeEnd = rangeStart + range.size; + BYTE const* const rangeEnd = range.size != 0 ? rangeStart + range.size : rangeStart; if (rangeStart == NULL || bufferStart == NULL) return 0; /* Empty ranges cannot overlap */ if (bufferStart == bufferEnd || rangeStart == rangeEnd) @@ -2041,11 +2085,11 @@ || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ size_t const jobSize = mtctx->inBuff.filled; assert(mtctx->inBuff.filled <= mtctx->targetSectionSize); - FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ); + FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , ""); } /* check for potential compressed data ready to be flushed */ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ @@ -2055,11 +2099,11 @@ } size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); + FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , ""); /* recommended next input size : fill current input buffer */ return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ } @@ -2072,10 +2116,10 @@ if ( mtctx->jobReady /* one job ready for a worker to pick up */ || (srcSize > 0) /* still some data within input buffer */ || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", (U32)srcSize, (U32)endFrame); - FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); + FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , ""); } /* check if there is any data available to flush */ return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); }