ext/libmongocrypt/libmongocrypt/etc/fle2_crypto.py in libmongocrypt-helper-1.7.4.0.1002 vs ext/libmongocrypt/libmongocrypt/etc/fle2_crypto.py in libmongocrypt-helper-1.8.0.0.1001
- old
+ new
@@ -6,10 +6,11 @@
ENCRYPTION_KEY_LENGTH = 32
MAC_KEY_LENGTH = 32
HMAC_SHA256_TAG_LENGTH = 32
IV_LENGTH = 16
DEK_LENGTH = 96
+BLOCK_LENGTH = 16
def _hmacsha256 (Km, input):
assert (len(Km) == MAC_KEY_LENGTH)
hm = hmac.HMAC(Km, hashes.SHA256())
hm.update (input)
@@ -23,83 +24,94 @@
assert (len(bytesIn) == DEK_LENGTH)
self.Ke = bytesIn[0:32]
self.Km = bytesIn[32:64]
self.TokenKey = bytesIn[64:96]
-def fle2_encrypt (M, Ke, IV):
+def _fle2_encrypt (IV, Ke, M, mode, Km = None, AD = None):
"""
- Compute
- S = AES-CTR.Enc(Ke, IV, M)
-
- Output
- C = IV || S
+ Generalized encrypt vector create.
+ S = AES-{mode}.Enc(Ke, IV, M)
+ if Km is not None:
+ T = HMAC/SHA-256(Km, AD || IV || S)
+ C = IV || S || T
"""
- assert (len(Ke) == ENCRYPTION_KEY_LENGTH)
- assert (len(IV) == IV_LENGTH)
+ assert len(Ke) == ENCRYPTION_KEY_LENGTH
+ assert len(IV) == IV_LENGTH
+ assert (mode == 'CTR') or (mode == 'CBC')
+ modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV)
- # S = AES-CTR.Enc(Ke, IV, M)
- cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV))
+ # S = AES-{mode}.Enc(Ke, IV, M)
+ cipher = Cipher(algorithms.AES(Ke), modeObj)
encryptor = cipher.encryptor()
+ if mode == 'CBC':
+ # PKCS#7
+ padding_len = BLOCK_LENGTH - (len(M) % BLOCK_LENGTH)
+ M = M + (padding_len.to_bytes(1, 'big') * padding_len)
S = encryptor.update(M) + encryptor.finalize()
- C = IV + S
+ if Km is not None:
+ assert AD is not None
+ assert len(Km) == MAC_KEY_LENGTH
+ # T = HMAC-SHA256(Km, AD || IV || S)
+ T = _hmacsha256 (Km, AD + IV + S)
+ else:
+ assert AD is None
+ T = b''
+
+ # C = IV + S + T
+ C = IV + S + T
return C
-def fle2_decrypt (C, Ke):
- assert (len(Ke) == ENCRYPTION_KEY_LENGTH)
- assert (len(C) > IV_LENGTH)
- IV = C[0:IV_LENGTH]
- # S = AES-CTR.Enc(Ke, IV, M)
- cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV))
- encryptor = cipher.decryptor()
- M = encryptor.update(C[IV_LENGTH:]) + encryptor.finalize()
- return M
+def fle2_encrypt (M, Ke, IV):
+ """ AES-256-CTR/NONE """
+ return _fle2_encrypt (IV, Ke, M, 'CTR')
def fle2aead_encrypt(M, Ke, IV, Km, AD):
- """
- Do FLE 2 AEAD encryption.
- See [AEAD with CTR](https://docs.google.com/document/d/1eCU7R8Kjr-mdyz6eKvhNIDVmhyYQcAaLtTfHeK7a_vE/edit#heading=h.35kjadvlcbty)
- See [aead_encryption_fle2_test_vectors.sh](https://github.com/mongodb/mongo/blob/ecc66915ac757cbeaa7c40eb443d7ec7bffcb80a/src/mongo/crypto/scripts/aead_encryption_fle2_test_vectors.sh#L15) for how server team is generating this.
- """
- assert (len(Ke) == ENCRYPTION_KEY_LENGTH)
- assert (len(IV) == IV_LENGTH)
- assert (len(Km) == MAC_KEY_LENGTH)
+ """ AES-256-CTR/SHA-256 """
+ return _fle2_encrypt (IV, Ke, M, 'CTR', Km, AD)
- # S = AES-CTR.Enc(Ke, IV, M)
- cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV))
- encryptor = cipher.encryptor()
- S = encryptor.update(M) + encryptor.finalize()
+def fle2v2_aead_encrypt(M, Ke, IV, Km, AD):
+ """ AES-256-CBC/SHA-256 """
+ return _fle2_encrypt (IV, Ke, M, 'CBC', Km, AD)
- # T = HMAC-SHA256(Km, AD || IV || S)
- T = _hmacsha256 (Km, AD + IV + S)
+def _fle2_decrypt (C, Ke, mode, Km = None, AD = None):
+ assert (len(Ke) == ENCRYPTION_KEY_LENGTH)
+ Tlen = 0 if Km is None else HMAC_SHA256_TAG_LENGTH;
+ assert (len(C) > (IV_LENGTH + Tlen))
# C = IV || S || T
- C = IV + S + T
- return C
+ IV = C[0:IV_LENGTH]
+ S = C[IV_LENGTH:-Tlen]
+ if Km is not None:
+ T = C[-Tlen:]
+ assert T == _hmacsha256 (Km, AD + IV + S)
-def fle2aead_decrypt(C, Km, AD, Ke):
- assert (len(Ke) == ENCRYPTION_KEY_LENGTH)
- assert (len(C) > HMAC_SHA256_TAG_LENGTH + IV_LENGTH)
- assert (len(Km) == MAC_KEY_LENGTH)
+ assert (mode == 'CTR') or (mode == 'CBC')
+ modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV)
- # Parse C as IV || S || T
- IV = C[0:IV_LENGTH]
- S = C[IV_LENGTH:-HMAC_SHA256_TAG_LENGTH]
- T = C[-HMAC_SHA256_TAG_LENGTH:]
+ # M = AES-{mode}.Dec(Ke, IV, S)
+ cipher = Cipher(algorithms.AES(Ke), modeObj)
+ encryptor = cipher.decryptor()
+ if mode == 'CBC':
+ # PKCS#7
+ padding_len = ord(S[-1:])
+ S = S[-padding_len:]
+ M = encryptor.update(S) + encryptor.finalize()
- # Compute T' = HMAC-SHA256(Km, AD || IV || S)
- Tp = _hmacsha256 (Km, AD + IV + S)
- if Tp != T:
- raise Exception("decryption error")
+ return M
- # Else compute and output M = AES-CTR.Dec(Ke, S)
- cipher = Cipher(algorithms.AES(Ke), modes.CTR(IV))
- decryptor = cipher.decryptor()
- M = decryptor.update(S) + decryptor.finalize()
+def fle2_decrypt (C, Ke):
+ """AES-256-CTR/NONE"""
+ return _fle2_decrypt (C, Ke, 'CTR')
- return M
+def fle2aead_decrypt(C, Km, AD, Ke):
+ """AES-256-CTR/SHA-256"""
+ return _fle2_decrypt (C, Ke, 'CTR', Km, AD)
+def fle2v2_aead_decrypt(C, Km, AD, Ke):
+ """AES-256-CBC/SHA-256"""
+ return _fle2_decrypt (C, Ke, 'CBC', Km, AD)
def ServerDataEncryptionLevel1Token (rootKey):
- return _hmacsha256 (rootKey, struct.pack("<Q", 3))
\ No newline at end of file
+ return _hmacsha256 (rootKey, struct.pack("<Q", 3))