Store `CredentialsStolenEvent` in the Island
Summary
Receive CredentialsStolenEvents in the Island and store them in the IEventRepository
Tasks
- [ ] Create
IEventRepositoryfor storing events - [ ] Remove
ITelemetryRepository - [ ] Implement
MongoEventRepositoryfor storing events in MongoDB - [ ] Create
EventPersistor(find a better name?) that subscribes to all events and usesIEventRepositoryto store them - [ ] Write a decorator for
IEventRepositorythat encrypts/decryptsCredentialsStolenEvents
Not sure how we intend to do the encryption. Here's one option that would work for mutable types:
from pydantic import SecretBytes, SecretStr
class SubThing:
def __init__(self):
self.data = SecretBytes(b"12345")
class Thing:
def __init__(self):
self.str = SecretStr("hello")
self.sub = SubThing()
def find_secret(obj, modify):
"""Recurse object for Secret attributes"""
for _, value in vars(obj).items():
if isinstance(value, SecretStr) or isinstance(value, SecretBytes):
modify(value)
else:
find_secret(value, modify)
def encrypt(obj):
print(f"Encrypting {obj.get_secret_value()}")
def decrypt(obj):
print(f"Decrypting {obj.get_secret_value()}")
thing = Thing()
find_secret(thing, encrypt)
find_secret(thing, decrypt)
Pydantic allows us to set the json_encoders for a model in the Config. However, it has a few drawbacks:
- It only applies to serializing, and not deserializing
- It does not allow us to set the encryptor
All the json_encoders is really doing is passing the encoders as an argument to json.dumps(). The BaseModel.json() method also allows users to specify an encoder, so we can do the encoding with a bit more flexibility that way. The following is working example.
Note, however, that this handles encryption/decryption only on encode, so it has to create a model with the encrypted data, and serialize that to decrypt the data, rather than simply decrypting the data on load (model -> encrypted_json -> encrypted_model -> decrypted_json -> decrypted model, instead of model -> encrypted_json -> decrypted_model). This is because we know the type of the data on serialization, but not on deserialization.
from pydantic import BaseModel, SecretBytes, SecretStr
from monkey_island.cc.server_utils.encryption import (
EncryptionKey32Bytes,
IEncryptor,
KeyBasedEncryptor,
)
import json
class SubThingModel(BaseModel):
data: SecretBytes
class ThingModel(BaseModel):
str: SecretStr
sub: SubThingModel
class SecretEncryptingDecoder(json.JSONEncoder):
def __init__(self, encryptor: IEncryptor):
self._encryptor = encryptor
def default(self, obj):
if isinstance(obj, SecretBytes):
return str(self._encryptor.encrypt(obj.get_secret_value()), "utf-8")
elif isinstance(obj, SecretStr):
return str(self._encryptor.encrypt(obj.get_secret_value().encode()), "utf-8")
return json.JSONEncoder.default(self, obj)
class SecretDecryptingDecoder(json.JSONDecoder):
def __init__(self, encryptor: IEncryptor):
self._encryptor = encryptor
def default(self, obj):
if isinstance(obj, SecretBytes):
return str(self._encryptor.decrypt(obj.get_secret_value()), "utf-8")
elif isinstance(obj, SecretStr):
decrypted_bytes = self._encryptor.decrypt(obj.get_secret_value().encode())
return str(decrypted_bytes, "utf-8")
return json.JSONEncoder.default(self, obj)
sub_model = SubThingModel(data=b"12345")
model = ThingModel(str="abc123", sub=sub_model)
KEY = EncryptionKey32Bytes(
b"!\x8a\xa9\x91\xf5\x124\xfcB\xdd\xb6\xee-\x8c\x82D\xe1p\x954\r\xf4\x1d5\xa9;\xef2|\x81\xb5\x15"
)
encryptor = KeyBasedEncryptor(KEY)
encrypt_decoder = SecretEncryptingDecoder(encryptor)
decrypt_decoder = SecretDecryptingDecoder(encryptor)
encrypted_model_data = model.json(encoder=encrypt_decoder.default)
print(f"Original: {model.str.get_secret_value()}")
print(encrypted_model_data)
encrypted_model = ThingModel.parse_raw(encrypted_model_data)
print(f"Encrypted: {encrypted_model.str.get_secret_value()}")
decrypted_model_data = encrypted_model.json(encoder=decrypt_decoder.default)
decrypted_model = ThingModel.parse_raw(decrypted_model_data)
print(f"Decrypted: {decrypted_model.str.get_secret_value()}")
Here's an example where we encrypt the JSON for all fields that aren't in AbstractAgentEvent:
from common.events import AbstractAgentEvent, CredentialsStolenEvent
from common.credentials import Credentials, Username, Password
from monkey_island.cc.server_utils.encryption import (
EncryptionKey32Bytes,
KeyBasedEncryptor,
)
import json
import uuid
ENCRYPTED_PREFIX = "encrypted_"
def get_fields_to_encrypt(event: AbstractAgentEvent):
return set(vars(AbstractAgentEvent)["__fields__"].keys()) ^ set(event.dict().keys())
def encrypt_event(event: AbstractAgentEvent, encrypt) -> str:
event_data = json.loads(event.json())
fields = get_fields_to_encrypt(event)
for field in fields:
event_data[ENCRYPTED_PREFIX + field] = str(
encrypt(json.dumps(event_data[field]).encode()), "utf-8"
)
del event_data[field]
return json.dumps(event_data)
def decrypt_event(json_data: str, decrypt) -> str:
event_data = json.loads(json_data)
for field in event_data.keys():
if field.startswith("encrypted_"):
event_data[field[len(ENCRYPTED_PREFIX) :]] = json.loads(
str(decrypt(event_data[field].encode()), "utf-8")
)
del event_data[field]
return json.dumps(event_data)
KEY = EncryptionKey32Bytes(
b"!\x8a\xa9\x91\xf5\x124\xfcB\xdd\xb6\xee-\x8c\x82D\xe1p\x954\r\xf4\x1d5\xa9;\xef2|\x81\xb5\x15"
)
username = Username(username="admin")
password = Password(password="easypass123")
creds = Credentials(identity=username, secret=password)
model = CredentialsStolenEvent(source=uuid.uuid4(), stolen_credentials=[creds])
data = model.dict()
print(data)
encryptor = KeyBasedEncryptor(KEY)
encrypted_data = encrypt_event(model, encryptor.encrypt)
print(encrypted_data)
print(decrypt_event(encrypted_data, encryptor.decrypt))