Skip to content
218 changes: 172 additions & 46 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ class SourceConnector(BaseConnector):

READ_CHUNK_SIZE = 4194304 # Chunk size for reading files

@classmethod
def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str:
"""Detect MIME type from file chunk using Python Magic.

Args:
chunk (bytes): File chunk to analyze
file_name (str): Name of the file being processed

Returns:
str: Detected MIME type (may be unsupported)
"""
# Primary MIME type detection using Python Magic
mime_type = magic.from_buffer(chunk, mime=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e if mime type is octet stream can we have a secondary check using magica? You can connect with @johnyrahul to see how he handled it. Why? In some cases the pdf file's mime type is not being captured properly by magic and shown as binary, magica can be used as a secondary check in such cases alone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ritwik-g is it a necessity to have both approaches? Maybe we could just use magika alone. I remember @shuveb suggested this too once

logger.info(
f"Detected MIME type using Python Magic: {mime_type} for file {file_name}"
)

return mime_type

def __init__(
self,
workflow: Workflow,
Expand Down Expand Up @@ -885,6 +904,117 @@ def load_file(self, input_file_path: str) -> tuple[str, BytesIO]:

return os.path.basename(input_file_path), file_stream

@classmethod
def _process_file_chunks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: @muhammad-ali-e provide a more suitable name. _process_file_chunks is a bit vague. Also we aren't accepting file chunks here rather we chunk it ourselves

cls, file: UploadedFile, file_storage, destination_path: str
) -> tuple[str, str, bool]:
"""Process file chunks and detect MIME type.

Args:
file: The uploaded file to process
file_storage: File storage instance
destination_path: Path where file should be stored

Returns:
tuple: (file_hash, mime_type, success) where success indicates if processing completed
"""
file_hash = sha256()
first_iteration = True
mime_type = file.content_type

file.seek(0)

try:
for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE):
if first_iteration:
mime_type = cls._detect_mime_type(chunk, file.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e do we check mimetype for every file? Or if the file.content_type is octet stream or missing? Reason for asking this, in past I recall @jagadeeswaran-zipstack working with magic library and often passing few bytes might not be enough but had to pass entire file content to detect some file mime types properly. So am bit worried this logic of relying on mime type from first chunks might not work in all cases

if not AllowedFileTypes.is_allowed(mime_type):
raise UnsupportedMimeTypeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: @muhammad-ali-e in case of unsupported wondering if we even need to raise this error or we can simply return return "", mime_type, False. Either should be fine. Just wondering which might be better. I feel like since we are only handling this specific error the try except really seems unnecessary and we could may be return from here. Not sure which might be the better practise

f"Unsupported MIME type '{mime_type}' for file '{file.name}'"
)
first_iteration = False

file_hash.update(chunk)
file_storage.write(path=destination_path, mode="ab", data=chunk)

return file_hash.hexdigest(), mime_type, True

except UnsupportedMimeTypeError:
return "", mime_type, False

@classmethod
def _create_file_hash_object(
cls,
file_path: str,
connection_type,
file_name: str,
file_hash: str,
is_executed: bool,
file_size: int,
mime_type: str,
) -> FileHash:
"""Create FileHash object with provided parameters."""
return FileHash(
file_path=file_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=file_hash,
is_executed=is_executed,
file_size=file_size,
mime_type=mime_type,
)

@classmethod
def _handle_unsupported_file(
cls,
file_name: str,
mime_type: str,
destination_path: str,
connection_type,
file_size: int,
workflow_log,
) -> FileHash:
"""Handle files with unsupported MIME types."""
log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'"
workflow_log.log_info(logger=logger, message=log_message)

fake_hash = f"temp-hash-{uuid.uuid4().hex}"
return cls._create_file_hash_object(
file_path=destination_path,
connection_type=connection_type,
file_name=file_name,
file_hash=fake_hash,
is_executed=True,
file_size=file_size,
mime_type=mime_type,
)

@classmethod
def _check_duplicate_file(
cls, file_hash: str, unique_file_hashes: set[str], file_name: str, workflow_log
) -> bool:
"""Check if file is duplicate and log if needed."""
if file_hash in unique_file_hashes:
log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing."
workflow_log.log_info(logger=logger, message=log_message)
return True
Comment on lines +1034 to +1037
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e didn't we recently discuss to not skip duplicates based on file hash / content and allow its processing?


unique_file_hashes.add(file_hash)
return False

@classmethod
def _get_execution_status(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Rename to _is_execution_completed() instead

cls, use_file_history: bool, workflow, file_hash: str
) -> bool:
"""Get execution status for file based on history."""
if not use_file_history:
return False

file_history = FileHistoryHelper.get_file_history(
workflow=workflow, cache_key=file_hash
)
return True if file_history and file_history.is_completed() else False

@classmethod
def add_input_file_to_api_storage(
cls,
Expand Down Expand Up @@ -919,65 +1049,61 @@ def add_input_file_to_api_storage(
workflow_id=workflow_id, execution_id=execution_id
)
workflow: Workflow = Workflow.objects.get(id=workflow_id)
file_hashes: dict[str, FileHash] = {}
file_hashes_objects: dict[str, FileHash] = {}
unique_file_hashes: set[str] = set()
connection_type = WorkflowEndpoint.ConnectionType.API

for file in file_objs:
file_name = file.name
destination_path = os.path.join(api_storage_dir, file_name)

mime_type = file.content_type
logger.info(f"Detected MIME type: {mime_type} for file {file_name}")
if not AllowedFileTypes.is_allowed(mime_type):
log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'"
workflow_log.log_info(logger=logger, message=log_message)
# Generate a clearly marked temporary hash to avoid reading the file content
# Helps to prevent duplicate entries in file executions
fake_hash = f"temp-hash-{uuid.uuid4().hex}"
file_hash = FileHash(
file_path=destination_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=fake_hash,
is_executed=True,
file_size=file.size,
mime_type=mime_type,
)
file_hashes.update({file_name: file_hash})
continue
logger.info(
f"Detected MIME type from content type header: {file.content_type} for file {file_name}"
)

file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_hash = sha256()
for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE):
file_hash.update(chunk)
file_storage.write(path=destination_path, mode="ab", data=chunk)
file_hash = file_hash.hexdigest()

# Skip duplicate files
if file_hash in unique_file_hashes:
log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing."
workflow_log.log_info(logger=logger, message=log_message)
continue
unique_file_hashes.add(file_hash)
# Process file chunks and detect MIME type
file_hash, mime_type, success = cls._process_file_chunks(
file, file_storage, destination_path
)

file_history = None
if use_file_history:
file_history = FileHistoryHelper.get_file_history(
workflow=workflow, cache_key=file_hash
# Handle unsupported files
if not success:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might make sense to rename it like process_chunks_success or mime_type_detected which would be more meaning ful

file_hash_object = cls._handle_unsupported_file(
file_name,
mime_type,
destination_path,
connection_type,
file.size,
workflow_log,
)
is_executed = True if file_history and file_history.is_completed() else False
file_hash = FileHash(
file_path=destination_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=file_hash,
is_executed=is_executed,
file_size=file.size,
mime_type=mime_type,
file_hashes_objects.update({file_name: file_hash_object})
continue

# Check for duplicates
if cls._check_duplicate_file(
file_hash, unique_file_hashes, file_name, workflow_log
):
continue

# Get execution status
is_executed = cls._get_execution_status(use_file_history, workflow, file_hash)

# Create file hash object
file_hash_object = cls._create_file_hash_object(
destination_path,
connection_type,
file_name,
file_hash,
is_executed,
file.size,
mime_type,
)
file_hashes.update({file_name: file_hash})
return file_hashes
file_hashes_objects.update({file_name: file_hash_object})

return file_hashes_objects

@classmethod
def create_endpoint_for_workflow(
Expand Down