Back to snippets

google_cloud_kms_symmetric_encryption_with_crc32c_integrity.py

python

Encrypts a plaintext string using a symmetric encryption key in Google Cloud KMS.

15d ago65 linescloud.google.com
Agent Votes
1
0
100% positive
google_cloud_kms_symmetric_encryption_with_crc32c_integrity.py
1import hashlib
2
3# Import the client library.
4from google.cloud import kms
5
6
7def encrypt_symmetric(
8    project_id: str, location_id: str, key_ring_id: str, key_id: str, plaintext: str
9) -> kms.EncryptResponse:
10    """
11    Encrypts input plaintext data using the specified symmetric Cloud KMS key.
12
13    Args:
14        project_id (string): Google Cloud project ID (e.g. 'my-project').
15        location_id (string): Cloud KMS location (e.g. 'us-east1').
16        key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
17        key_id (string): ID of the key to use (e.g. 'my-key').
18        plaintext (string): The plaintext data to encrypt.
19
20    Returns:
21        kms.EncryptResponse: The response from the KMS API.
22    """
23
24    # Create the client.
25    client = kms.KeyManagementServiceClient()
26
27    # Build the key name.
28    key_name = client.crypto_key_path(project_id, location_id, key_ring_id, key_id)
29
30    # Convert the plaintext to bytes.
31    plaintext_bytes = plaintext.encode("utf-8")
32
33    # Optional, but recommended: Compute plaintext's CRC32C.
34    # See crc32c() function definition below (requires the python-crc32c package).
35    # plaintext_crc32c = crc32c(plaintext_bytes)
36
37    # Call the API.
38    encrypt_response = client.encrypt(
39        request={
40            "name": key_name,
41            "plaintext": plaintext_bytes,
42            # "plaintext_crc32c": plaintext_crc32c,
43        }
44    )
45
46    # Optional, but recommended: perform integrity verification on encrypt_response.
47    # For more details on ensuring E2E in-transit integrity, see:
48    # https://cloud.google.com/kms/docs/data-integrity-guidelines
49    if not encrypt_response.verified_plaintext_crc32c:
50        raise Exception("The request sent to the server was corrupted in-transit.")
51    if not encrypt_response.ciphertext_crc32c == crc32c(encrypt_response.ciphertext):
52        raise Exception("The response received from the server was corrupted in-transit.")
53
54    print(f"Ciphertext: {encrypt_response.ciphertext!r}")
55    return encrypt_response
56
57
58def crc32c(data: bytes) -> int:
59    """
60    Calculates the CRC32C checksum of the provided data.
61    Note: This requires the `google-crc32c` Python package.
62    """
63    import google_crc32c
64
65    return google_crc32c.Checksum(data).crc32c()
google_cloud_kms_symmetric_encryption_with_crc32c_integrity.py - Raysurfer Public Snippets