|
| 1 | +import json |
| 2 | +import os |
1 | 3 | import time |
2 | 4 | from typing import Generator |
3 | | -from unittest.mock import patch |
| 5 | +from unittest.mock import mock_open, patch |
4 | 6 |
|
5 | 7 | from pytest import fixture, mark |
6 | 8 |
|
7 | 9 | from firebolt.utils.cache import ( |
8 | 10 | CACHE_EXPIRY_SECONDS, |
9 | 11 | ConnectionInfo, |
| 12 | + FileBasedCache, |
10 | 13 | SecureCacheKey, |
11 | 14 | UtilCache, |
12 | 15 | ) |
| 16 | +from firebolt.utils.file_operations import FernetEncrypter, generate_salt |
13 | 17 |
|
14 | 18 |
|
15 | 19 | @fixture |
@@ -78,6 +82,26 @@ def test_string(): |
78 | 82 | return "test_value" |
79 | 83 |
|
80 | 84 |
|
| 85 | +@fixture |
| 86 | +def file_based_cache() -> Generator[FileBasedCache, None, None]: |
| 87 | + """Create a fresh FileBasedCache instance for testing.""" |
| 88 | + memory_cache = UtilCache[ConnectionInfo](cache_name="test_memory_cache") |
| 89 | + memory_cache.enable() |
| 90 | + cache = FileBasedCache(memory_cache, cache_name="test_file_cache") |
| 91 | + cache.enable() |
| 92 | + yield cache |
| 93 | + cache.clear() |
| 94 | + |
| 95 | + |
| 96 | +@fixture |
| 97 | +def encrypter_with_key(): |
| 98 | + """Create a FernetEncrypter instance for testing.""" |
| 99 | + from firebolt.utils.file_operations import FernetEncrypter, generate_salt |
| 100 | + |
| 101 | + salt = generate_salt() |
| 102 | + return FernetEncrypter(salt, "test_encryption_key") |
| 103 | + |
| 104 | + |
81 | 105 | def test_cache_set_and_get(cache, sample_cache_key, sample_connection_info): |
82 | 106 | """Test basic cache set and get operations.""" |
83 | 107 | # Test cache miss initially |
@@ -415,3 +439,124 @@ def test_connection_info_post_init(): |
415 | 439 | assert connection_info2.system_engine is engine_obj |
416 | 440 | assert connection_info2.databases["db1"] is db_obj |
417 | 441 | assert connection_info2.engines["engine1"] is engine_obj |
| 442 | + |
| 443 | + |
| 444 | +@mark.nofakefs |
| 445 | +def test_file_based_cache_read_data_json_file_not_exists( |
| 446 | + file_based_cache, encrypter_with_key |
| 447 | +): |
| 448 | + """Test _read_data_json returns empty dict when file doesn't exist.""" |
| 449 | + # Test with a non-existent file path |
| 450 | + result = file_based_cache._read_data_json( |
| 451 | + "/path/to/nonexistent/file.txt", encrypter_with_key |
| 452 | + ) |
| 453 | + assert result == {} |
| 454 | + |
| 455 | + |
| 456 | +def test_file_based_cache_read_data_json_valid_data( |
| 457 | + file_based_cache, encrypter_with_key |
| 458 | +): |
| 459 | + """Test _read_data_json successfully reads and decrypts valid JSON data.""" |
| 460 | + # Create test data |
| 461 | + test_data = {"id": "test_connection", "token": "test_token"} |
| 462 | + test_file_path = "/test_cache/valid_data.txt" |
| 463 | + |
| 464 | + # Create directory and file with encrypted JSON data |
| 465 | + os.makedirs(os.path.dirname(test_file_path), exist_ok=True) |
| 466 | + json_str = json.dumps(test_data) |
| 467 | + encrypted_data = encrypter_with_key.encrypt(json_str) |
| 468 | + |
| 469 | + with open(test_file_path, "w") as f: |
| 470 | + f.write(encrypted_data) |
| 471 | + |
| 472 | + # Test reading the valid encrypted data |
| 473 | + result = file_based_cache._read_data_json(test_file_path, encrypter_with_key) |
| 474 | + assert result == test_data |
| 475 | + assert result["id"] == "test_connection" |
| 476 | + assert result["token"] == "test_token" |
| 477 | + |
| 478 | + |
| 479 | +def test_file_based_cache_read_data_json_decryption_failure(file_based_cache): |
| 480 | + """Test _read_data_json returns empty dict when decryption fails.""" |
| 481 | + # Create encrypters with different keys |
| 482 | + salt = generate_salt() |
| 483 | + encrypter1 = FernetEncrypter(salt, "test_key_1") |
| 484 | + encrypter2 = FernetEncrypter(salt, "test_key_2") # Different key |
| 485 | + |
| 486 | + test_file_path = "/test_cache/decryption_test.txt" |
| 487 | + |
| 488 | + # Create directory and file with data encrypted by encrypter1 |
| 489 | + os.makedirs(os.path.dirname(test_file_path), exist_ok=True) |
| 490 | + encrypted_data = encrypter1.encrypt('{"test": "data"}') |
| 491 | + |
| 492 | + with open(test_file_path, "w") as f: |
| 493 | + f.write(encrypted_data) |
| 494 | + |
| 495 | + # Try to decrypt with encrypter2 (should fail) |
| 496 | + result = file_based_cache._read_data_json(test_file_path, encrypter2) |
| 497 | + assert result == {} |
| 498 | + |
| 499 | + |
| 500 | +def test_file_based_cache_read_data_json_invalid_json( |
| 501 | + file_based_cache, encrypter_with_key |
| 502 | +): |
| 503 | + """Test _read_data_json returns empty dict when JSON is invalid.""" |
| 504 | + test_file_path = "/test_cache/invalid_json.txt" |
| 505 | + |
| 506 | + # Create directory and file with encrypted invalid JSON |
| 507 | + os.makedirs(os.path.dirname(test_file_path), exist_ok=True) |
| 508 | + invalid_json = "invalid json data {{" |
| 509 | + encrypted_data = encrypter_with_key.encrypt(invalid_json) |
| 510 | + |
| 511 | + with open(test_file_path, "w") as f: |
| 512 | + f.write(encrypted_data) |
| 513 | + |
| 514 | + # Test reading the invalid JSON |
| 515 | + result = file_based_cache._read_data_json(test_file_path, encrypter_with_key) |
| 516 | + assert result == {} |
| 517 | + |
| 518 | + |
| 519 | +@mark.nofakefs |
| 520 | +def test_file_based_cache_read_data_json_io_error(file_based_cache, encrypter_with_key): |
| 521 | + """Test _read_data_json returns empty dict when IOError occurs.""" |
| 522 | + # Mock open to raise IOError |
| 523 | + with patch("builtins.open", mock_open()) as mock_file: |
| 524 | + mock_file.side_effect = IOError("File read error") |
| 525 | + |
| 526 | + result = file_based_cache._read_data_json("test_file.txt", encrypter_with_key) |
| 527 | + assert result == {} |
| 528 | + |
| 529 | + |
| 530 | +def test_file_based_cache_read_data_json_empty_encrypted_data( |
| 531 | + file_based_cache, encrypter_with_key |
| 532 | +): |
| 533 | + """Test _read_data_json handles empty encrypted data.""" |
| 534 | + test_file_path = "/test_cache/empty_data.txt" |
| 535 | + |
| 536 | + # Create directory and file with empty encrypted data |
| 537 | + os.makedirs(os.path.dirname(test_file_path), exist_ok=True) |
| 538 | + encrypted_empty = encrypter_with_key.encrypt("") |
| 539 | + |
| 540 | + with open(test_file_path, "w") as f: |
| 541 | + f.write(encrypted_empty) |
| 542 | + |
| 543 | + # Test reading empty decrypted data |
| 544 | + result = file_based_cache._read_data_json(test_file_path, encrypter_with_key) |
| 545 | + assert result == {} |
| 546 | + |
| 547 | + |
| 548 | +def test_file_based_cache_read_data_json_invalid_encrypted_format( |
| 549 | + file_based_cache, encrypter_with_key |
| 550 | +): |
| 551 | + """Test _read_data_json handles invalid encrypted data format.""" |
| 552 | + test_file_path = "/test_cache/invalid_encrypted.txt" |
| 553 | + |
| 554 | + # Create directory and file with invalid encrypted data format |
| 555 | + os.makedirs(os.path.dirname(test_file_path), exist_ok=True) |
| 556 | + |
| 557 | + with open(test_file_path, "w") as f: |
| 558 | + f.write("not_encrypted_data_at_all") |
| 559 | + |
| 560 | + # Test reading invalid encrypted format |
| 561 | + result = file_based_cache._read_data_json(test_file_path, encrypter_with_key) |
| 562 | + assert result == {} |
0 commit comments