ext/libmongocrypt/libmongocrypt/etc/fle2_crypto.py in libmongocrypt-helper-1.7.4.0.1002 vs ext/libmongocrypt/libmongocrypt/etc/fle2_crypto.py in libmongocrypt-helper-1.8.0.0.1001

- old
+ new

@@ -6,10 +6,11 @@ ENCRYPTION_KEY_LENGTH = 32 MAC_KEY_LENGTH = 32 HMAC_SHA256_TAG_LENGTH = 32 IV_LENGTH = 16 DEK_LENGTH = 96 +BLOCK_LENGTH = 16 def _hmacsha256 (Km, input): assert (len(Km) == MAC_KEY_LENGTH) hm = hmac.HMAC(Km, hashes.SHA256()) hm.update (input) @@ -23,83 +24,94 @@ assert (len(bytesIn) == DEK_LENGTH) self.Ke = bytesIn[0:32] self.Km = bytesIn[32:64] self.TokenKey = bytesIn[64:96] -def fle2_encrypt (M, Ke, IV): +def _fle2_encrypt (IV, Ke, M, mode, Km = None, AD = None): """ - Compute - S = AES-CTR.Enc(Ke, IV, M) - - Output - C = IV || S + Generalized encrypt vector create. + S = AES-{mode}.Enc(Ke, IV, M) + if Km is not None: + T = HMAC/SHA-256(Km, AD || IV || S) + C = IV || S || T """ - assert (len(Ke) == ENCRYPTION_KEY_LENGTH) - assert (len(IV) == IV_LENGTH) + assert len(Ke) == ENCRYPTION_KEY_LENGTH + assert len(IV) == IV_LENGTH + assert (mode == 'CTR') or (mode == 'CBC') + modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV) - # S = AES-CTR.Enc(Ke, IV, M) - cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV)) + # S = AES-{mode}.Enc(Ke, IV, M) + cipher = Cipher(algorithms.AES(Ke), modeObj) encryptor = cipher.encryptor() + if mode == 'CBC': + # PKCS#7 + padding_len = BLOCK_LENGTH - (len(M) % BLOCK_LENGTH) + M = M + (padding_len.to_bytes(1, 'big') * padding_len) S = encryptor.update(M) + encryptor.finalize() - C = IV + S + if Km is not None: + assert AD is not None + assert len(Km) == MAC_KEY_LENGTH + # T = HMAC-SHA256(Km, AD || IV || S) + T = _hmacsha256 (Km, AD + IV + S) + else: + assert AD is None + T = b'' + + # C = IV + S + T + C = IV + S + T return C -def fle2_decrypt (C, Ke): - assert (len(Ke) == ENCRYPTION_KEY_LENGTH) - assert (len(C) > IV_LENGTH) - IV = C[0:IV_LENGTH] - # S = AES-CTR.Enc(Ke, IV, M) - cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV)) - encryptor = cipher.decryptor() - M = encryptor.update(C[IV_LENGTH:]) + encryptor.finalize() - return M +def fle2_encrypt (M, Ke, IV): + """ AES-256-CTR/NONE """ + return _fle2_encrypt (IV, Ke, M, 'CTR') def fle2aead_encrypt(M, Ke, IV, Km, AD): - """ - Do FLE 2 AEAD encryption. - See [AEAD with CTR](https://docs.google.com/document/d/1eCU7R8Kjr-mdyz6eKvhNIDVmhyYQcAaLtTfHeK7a_vE/edit#heading=h.35kjadvlcbty) - See [aead_encryption_fle2_test_vectors.sh](https://github.com/mongodb/mongo/blob/ecc66915ac757cbeaa7c40eb443d7ec7bffcb80a/src/mongo/crypto/scripts/aead_encryption_fle2_test_vectors.sh#L15) for how server team is generating this. - """ - assert (len(Ke) == ENCRYPTION_KEY_LENGTH) - assert (len(IV) == IV_LENGTH) - assert (len(Km) == MAC_KEY_LENGTH) + """ AES-256-CTR/SHA-256 """ + return _fle2_encrypt (IV, Ke, M, 'CTR', Km, AD) - # S = AES-CTR.Enc(Ke, IV, M) - cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV)) - encryptor = cipher.encryptor() - S = encryptor.update(M) + encryptor.finalize() +def fle2v2_aead_encrypt(M, Ke, IV, Km, AD): + """ AES-256-CBC/SHA-256 """ + return _fle2_encrypt (IV, Ke, M, 'CBC', Km, AD) - # T = HMAC-SHA256(Km, AD || IV || S) - T = _hmacsha256 (Km, AD + IV + S) +def _fle2_decrypt (C, Ke, mode, Km = None, AD = None): + assert (len(Ke) == ENCRYPTION_KEY_LENGTH) + Tlen = 0 if Km is None else HMAC_SHA256_TAG_LENGTH; + assert (len(C) > (IV_LENGTH + Tlen)) # C = IV || S || T - C = IV + S + T - return C + IV = C[0:IV_LENGTH] + S = C[IV_LENGTH:-Tlen] + if Km is not None: + T = C[-Tlen:] + assert T == _hmacsha256 (Km, AD + IV + S) -def fle2aead_decrypt(C, Km, AD, Ke): - assert (len(Ke) == ENCRYPTION_KEY_LENGTH) - assert (len(C) > HMAC_SHA256_TAG_LENGTH + IV_LENGTH) - assert (len(Km) == MAC_KEY_LENGTH) + assert (mode == 'CTR') or (mode == 'CBC') + modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV) - # Parse C as IV || S || T - IV = C[0:IV_LENGTH] - S = C[IV_LENGTH:-HMAC_SHA256_TAG_LENGTH] - T = C[-HMAC_SHA256_TAG_LENGTH:] + # M = AES-{mode}.Dec(Ke, IV, S) + cipher = Cipher(algorithms.AES(Ke), modeObj) + encryptor = cipher.decryptor() + if mode == 'CBC': + # PKCS#7 + padding_len = ord(S[-1:]) + S = S[-padding_len:] + M = encryptor.update(S) + encryptor.finalize() - # Compute T' = HMAC-SHA256(Km, AD || IV || S) - Tp = _hmacsha256 (Km, AD + IV + S) - if Tp != T: - raise Exception("decryption error") + return M - # Else compute and output M = AES-CTR.Dec(Ke, S) - cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV)) - decryptor = cipher.decryptor() - M = decryptor.update(S) + decryptor.finalize() +def fle2_decrypt (C, Ke): + """AES-256-CTR/NONE""" + return _fle2_decrypt (C, Ke, 'CTR') - return M +def fle2aead_decrypt(C, Km, AD, Ke): + """AES-256-CTR/SHA-256""" + return _fle2_decrypt (C, Ke, 'CTR', Km, AD) +def fle2v2_aead_decrypt(C, Km, AD, Ke): + """AES-256-CBC/SHA-256""" + return _fle2_decrypt (C, Ke, 'CBC', Km, AD) def ServerDataEncryptionLevel1Token (rootKey): - return _hmacsha256 (rootKey, struct.pack("<Q", 3)) \ No newline at end of file + return _hmacsha256 (rootKey, struct.pack("<Q", 3))