contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3 vs contrib/zstd/lib/compress/zstdmt_compress.c in extzstd-0.3.1
- old
+ new
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
+ * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
@@ -20,13 +20,13 @@
/* ====== Dependencies ====== */
#include <string.h> /* memcpy, memset */
#include <limits.h> /* INT_MAX, UINT_MAX */
-#include "mem.h" /* MEM_STATIC */
-#include "pool.h" /* threadpool */
-#include "threading.h" /* mutex */
+#include "../common/mem.h" /* MEM_STATIC */
+#include "../common/pool.h" /* threadpool */
+#include "../common/threading.h" /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstd_ldm.h"
#include "zstdmt_compress.h"
/* Guards code to support resizing the SeqPool.
@@ -459,11 +459,17 @@
ZSTD_pthread_mutex_t ldmWindowMutex;
ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
} serialState_t;
-static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize)
+static int
+ZSTDMT_serialState_reset(serialState_t* serialState,
+ ZSTDMT_seqPool* seqPool,
+ ZSTD_CCtx_params params,
+ size_t jobSize,
+ const void* dict, size_t const dictSize,
+ ZSTD_dictContentType_e dictContentType)
{
/* Adjust parameters */
if (params.ldmParams.enableLdm) {
DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams);
@@ -488,12 +494,11 @@
serialState->params.ldmParams.hashLog -
serialState->params.ldmParams.bucketSizeLog;
/* Size the seq pool tables */
ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));
/* Reset the window */
- ZSTD_window_clear(&serialState->ldmState.window);
- serialState->ldmWindow = serialState->ldmState.window;
+ ZSTD_window_init(&serialState->ldmState.window);
/* Resize tables and output space if necessary. */
if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) {
ZSTD_free(serialState->ldmState.hashTable, cMem);
serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem);
}
@@ -504,11 +509,28 @@
if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)
return 1;
/* Zero the tables */
memset(serialState->ldmState.hashTable, 0, hashSize);
memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
+
+ /* Update window state and fill hash table with dict */
+ serialState->ldmState.loadedDictEnd = 0;
+ if (dictSize > 0) {
+ if (dictContentType == ZSTD_dct_rawContent) {
+ BYTE const* const dictEnd = (const BYTE*)dict + dictSize;
+ ZSTD_window_update(&serialState->ldmState.window, dict, dictSize);
+ ZSTD_ldm_fillHashTable(&serialState->ldmState, (const BYTE*)dict, dictEnd, ¶ms.ldmParams);
+ serialState->ldmState.loadedDictEnd = params.forceWindow ? 0 : (U32)(dictEnd - serialState->ldmState.window.base);
+ } else {
+ /* don't even load anything */
+ }
+ }
+
+ /* Initialize serialState's copy of ldmWindow. */
+ serialState->ldmWindow = serialState->ldmState.window;
}
+
serialState->params = params;
serialState->params.jobSize = (U32)jobSize;
return 0;
}
@@ -666,11 +688,11 @@
jobParams.ldmParams.enableLdm = 0;
/* init */
if (job->cdict) {
- size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
+ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
{ size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
@@ -678,11 +700,11 @@
}
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
ZSTD_dtlm_fast,
NULL, /*cdict*/
- jobParams, pledgedSrcSize);
+ &jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} }
/* Perform serial step as early as possible, but after CCtx initialization */
ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
@@ -925,16 +947,22 @@
static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
{
unsigned jobID;
DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
+ /* Copy the mutex/cond out */
+ ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;
+ ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;
+
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
- mtctx->jobs[jobID].dstBuff = g_nullBuffer;
- mtctx->jobs[jobID].cSize = 0;
+
+ /* Clear the job description, but keep the mutex/cond */
+ memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));
+ mtctx->jobs[jobID].job_mutex = mutex;
+ mtctx->jobs[jobID].job_cond = cond;
}
- memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
mtctx->allJobsCompleted = 1;
}
@@ -1026,13 +1054,13 @@
}
}
/* Sets parameters relevant to the compression job,
* initializing others to default values. */
-static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
+static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params)
{
- ZSTD_CCtx_params jobParams = params;
+ ZSTD_CCtx_params jobParams = *params;
/* Clear parameters related to multithreading */
jobParams.forceWindow = 0;
jobParams.nbWorkers = 0;
jobParams.jobSize = 0;
jobParams.overlapLog = 0;
@@ -1046,11 +1074,11 @@
/* ZSTDMT_resize() :
* @return : error code if fails, 0 on success */
static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
- FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
+ FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers);
@@ -1068,11 +1096,11 @@
U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */
int const compressionLevel = cctxParams->compressionLevel;
DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)",
compressionLevel);
mtctx->params.compressionLevel = compressionLevel;
- { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, 0, 0);
+ { ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0);
cParams.windowLog = saved_wlog;
mtctx->params.cParams = cParams;
}
}
@@ -1127,13 +1155,18 @@
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
assert(flushed <= produced);
+ assert(jobPtr->consumed <= jobPtr->src.size);
toFlush = produced - flushed;
- if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
- /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
+ /* if toFlush==0, nothing is available to flush.
+ * However, jobID is expected to still be active:
+ * if jobID was already completed and fully flushed,
+ * ZSTDMT_flushProduced() should have already moved onto next job.
+ * Therefore, some input has not yet been consumed. */
+ if (toFlush==0) {
assert(jobPtr->consumed < jobPtr->src.size);
}
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
@@ -1144,18 +1177,22 @@
/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
-static unsigned ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
+static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)
{
- if (params.ldmParams.enableLdm)
+ unsigned jobLog;
+ if (params->ldmParams.enableLdm) {
/* In Long Range Mode, the windowLog is typically oversized.
* In which case, it's preferable to determine the jobSize
* based on chainLog instead. */
- return MAX(21, params.cParams.chainLog + 4);
- return MAX(20, params.cParams.windowLog + 2);
+ jobLog = MAX(21, params->cParams.chainLog + 4);
+ } else {
+ jobLog = MAX(20, params->cParams.windowLog + 2);
+ }
+ return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX);
}
static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)
{
switch(strat)
@@ -1182,31 +1219,31 @@
assert(0 <= ovlog && ovlog <= 9);
if (ovlog == 0) return ZSTDMT_overlapLog_default(strat);
return ovlog;
}
-static size_t ZSTDMT_computeOverlapSize(ZSTD_CCtx_params const params)
+static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
{
- int const overlapRLog = 9 - ZSTDMT_overlapLog(params.overlapLog, params.cParams.strategy);
- int ovLog = (overlapRLog >= 8) ? 0 : (params.cParams.windowLog - overlapRLog);
+ int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy);
+ int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog);
assert(0 <= overlapRLog && overlapRLog <= 8);
- if (params.ldmParams.enableLdm) {
+ if (params->ldmParams.enableLdm) {
/* In Long Range Mode, the windowLog is typically oversized.
* In which case, it's preferable to determine the jobSize
* based on chainLog instead.
* Then, ovLog becomes a fraction of the jobSize, rather than windowSize */
- ovLog = MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
+ ovLog = MIN(params->cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
- overlapRLog;
}
- assert(0 <= ovLog && ovLog <= 30);
- DEBUGLOG(4, "overlapLog : %i", params.overlapLog);
+ assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX);
+ DEBUGLOG(4, "overlapLog : %i", params->overlapLog);
DEBUGLOG(4, "overlap size : %i", 1 << ovLog);
return (ovLog==0) ? 0 : (size_t)1 << ovLog;
}
static unsigned
-ZSTDMT_computeNbJobs(ZSTD_CCtx_params params, size_t srcSize, unsigned nbWorkers)
+ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers)
{
assert(nbWorkers>0);
{ size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params);
size_t const jobMaxSize = jobSizeTarget << 2;
size_t const passSizeMax = jobMaxSize * nbWorkers;
@@ -1218,20 +1255,21 @@
} }
/* ZSTDMT_compress_advanced_internal() :
* This is a blocking function : it will only give back control to caller after finishing its compression job.
*/
-static size_t ZSTDMT_compress_advanced_internal(
+static size_t
+ZSTDMT_compress_advanced_internal(
ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params)
{
- ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
- size_t const overlapSize = ZSTDMT_computeOverlapSize(params);
- unsigned const nbJobs = ZSTDMT_computeNbJobs(params, srcSize, params.nbWorkers);
+ ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(¶ms);
+ size_t const overlapSize = ZSTDMT_computeOverlapSize(¶ms);
+ unsigned const nbJobs = ZSTDMT_computeNbJobs(¶ms, srcSize, params.nbWorkers);
size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src;
size_t remainingSrcSize = srcSize;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
@@ -1245,19 +1283,20 @@
if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode");
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
- return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
+ return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams);
}
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
- if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
+ /* LDM doesn't even try to load the dictionary in single-ingestion mode */
+ if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto))
return ERROR(memory_allocation);
- FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */
+ FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , ""); /* only expands if necessary */
{ unsigned u;
for (u=0; u<nbJobs; u++) {
size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
@@ -1386,23 +1425,23 @@
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
/* init */
if (params.nbWorkers != mtctx->params.nbWorkers)
- FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) );
+ FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , "");
if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
- if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
+ if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX;
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (mtctx->singleBlockingThread) {
- ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
+ ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(¶ms);
DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbWorkers == 0);
return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict,
- singleThreadParams, pledgedSrcSize);
+ &singleThreadParams, pledgedSrcSize);
}
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
@@ -1424,16 +1463,18 @@
ZSTD_freeCDict(mtctx->cdictLocal);
mtctx->cdictLocal = NULL;
mtctx->cdict = cdict;
}
- mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(params);
+ mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms);
DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
mtctx->targetSectionSize = params.jobSize;
if (mtctx->targetSectionSize == 0) {
- mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
+ mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms);
}
+ assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
+
if (params.rsyncable) {
/* Aim for the targetsectionSize as the average job size. */
U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20;
assert(jobSizeMB >= 1);
@@ -1481,11 +1522,12 @@
mtctx->nextJobID = 0;
mtctx->frameEnded = 0;
mtctx->allJobsCompleted = 0;
mtctx->consumed = 0;
mtctx->produced = 0;
- if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize))
+ if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
+ dict, dictSize, dictContentType))
return ERROR(memory_allocation);
return 0;
}
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
@@ -1695,13 +1737,15 @@
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
(U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize);
assert(mtctx->doneJobID < mtctx->nextJobID);
assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
assert(mtctx->jobs[wJobID].dstBuff.start != NULL);
- memcpy((char*)output->dst + output->pos,
- (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,
- toFlush);
+ if (toFlush > 0) {
+ memcpy((char*)output->dst + output->pos,
+ (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,
+ toFlush);
+ }
output->pos += toFlush;
mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */
if ( (srcConsumed == srcSize) /* job is completed */
&& (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */
@@ -1767,11 +1811,11 @@
static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
{
BYTE const* const bufferStart = (BYTE const*)buffer.start;
BYTE const* const bufferEnd = bufferStart + buffer.capacity;
BYTE const* const rangeStart = (BYTE const*)range.start;
- BYTE const* const rangeEnd = rangeStart + range.size;
+ BYTE const* const rangeEnd = range.size != 0 ? rangeStart + range.size : rangeStart;
if (rangeStart == NULL || bufferStart == NULL)
return 0;
/* Empty ranges cannot overlap */
if (bufferStart == bufferEnd || rangeStart == rangeEnd)
@@ -2041,11 +2085,11 @@
|| (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */
|| ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */
|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */
size_t const jobSize = mtctx->inBuff.filled;
assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
- FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
+ FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , "");
}
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
@@ -2055,11 +2099,11 @@
}
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
- FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
+ FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , "");
/* recommended next input size : fill current input buffer */
return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}
@@ -2072,10 +2116,10 @@
if ( mtctx->jobReady /* one job ready for a worker to pick up */
|| (srcSize > 0) /* still some data within input buffer */
|| ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
(U32)srcSize, (U32)endFrame);
- FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
+ FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , "");
}
/* check if there is any data available to flush */
return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame);
}