feat: add markdown_json output format for unstructured document parsing#948
feat: add markdown_json output format for unstructured document parsing#948Ryan Waskewich (rwask) wants to merge 6 commits intomainfrom
Conversation
Add a new output_format config option to UnstructuredFormat with two choices: - markdown_text (default, backward-compatible): renders documents as flat Markdown text - markdown_json: outputs a JSON array of structured elements with type, text, and metadata fields This preserves document structure (element types, page numbers, coordinates, section depth) for easier downstream processing. The change is additive and non-breaking. Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773176340-unstructured-markdown-json-output#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773176340-unstructured-markdown-json-outputPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…wn_json When output_format='markdown_json', MD and TXT files now return a JSON array with a single NarrativeText element instead of raw text, ensuring consistent output format across all file types. Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds an Changes
Sequence Diagram(s)sequenceDiagram
participant Source as File Source
participant Parser as Unstructured Parser
participant Local as Local Processor
participant Remote as Remote API
participant Output as Formatter
Source->>Parser: parse_records(file, format.output_format)
alt local
Parser->>Local: _read_file_locally_elements(file)
Local-->>Parser: List[Dict] elements
else remote
Parser->>Remote: _read_file_remotely_elements_with_retries(file)
Remote-->>Parser: List[Dict] elements
end
Parser->>Parser: if output_format == "markdown_json"
alt markdown_json
Parser->>Output: json.dumps(elements)
else markdown_text
Parser->>Output: render_markdown_from(elements)
end
Output-->>Source: formatted content
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Would you like me to suggest deduplicating the duplicated markdown_json tests and add a short doc note about the new 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py`:
- Around line 151-156: The content_description assignment advertises JSON when
format.output_format == "markdown_json" even though _read_file() returns raw
text for FileType.MD and FileType.TXT; update the logic in
unstructured_parser.py to align the schema description with actual return types
by checking the file type (e.g., FileType.MD or FileType.TXT) as well as
format.output_format before setting content_description, ensure
content_description describes plain markdown/text for those file types
regardless of output_format, and keep the markdown_json description only for
file types that actually yield JSON-structured records.
- Around line 339-355: The retry wrapper
_read_file_remotely_elements_with_retries currently reuses the same file_handle
across attempts causing requests.post to resend an EOF/partial stream; before
each retry attempt rewind the stream (call file_handle.seek(0)) or, if the
handle is not seekable, buffer the contents (e.g., into a BytesIO) and use that
buffer for each attempt so the uploaded file is complete on retries; apply the
same change to the other retry wrapper that delegates to the remote read (the
similar retry function for the chunked/alternative remote-read path) so both
retry paths reset or rebuffer the file_handle before each request attempt.
In `@unit_tests/sources/file_based/file_types/test_unstructured_parser.py`:
- Around line 782-786: The test fixture is passing the wrong keyword arg to
UnstructuredFormat; update the config.format assignment to use
skip_unprocessable_files (not skip_unprocessable_file_types) so the flag is
actually set—i.e., change the UnstructuredFormat(...) call where config.format
is assigned to pass skip_unprocessable_files=False while leaving the rest
(output_format and processing / APIProcessingConfigModel) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c5eaa411-e18d-490f-a2a4-a69101caf172
📒 Files selected for processing (3)
airbyte_cdk/sources/file_based/config/unstructured_format.pyairbyte_cdk/sources/file_based/file_types/unstructured_parser.pyunit_tests/sources/file_based/file_types/test_unstructured_parser.py
| @backoff.on_exception( | ||
| backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error | ||
| ) | ||
| def _read_file_remotely_elements_with_retries( | ||
| self, | ||
| file_handle: IOBase, | ||
| format: APIProcessingConfigModel, | ||
| filetype: FileType, | ||
| strategy: str, | ||
| remote_file: RemoteFile, | ||
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Read a file remotely and return the raw JSON elements, retrying up to 5 times if the error is not caused by user error. | ||
| """ | ||
| return self._read_file_remotely_elements( | ||
| file_handle, format, filetype, strategy, remote_file | ||
| ) |
There was a problem hiding this comment.
Could we rewind file_handle before each remote attempt, wdyt?
This retry path reuses the same stream across attempts, but requests.post(..., files=...) consumes it. After the first transient failure, later retries can resend EOF/partial data and make recovery impossible.
Suggested fix
def _read_file_remotely_elements(
self,
file_handle: IOBase,
format: APIProcessingConfigModel,
filetype: FileType,
strategy: str,
remote_file: RemoteFile,
) -> List[Dict[str, Any]]:
+ file_handle.seek(0)
headers = {"accept": "application/json", "unstructured-api-key": format.api_key}
data = self._params_to_dict(format.parameters, strategy)
file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}Also applies to: 386-410
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py` around
lines 339 - 355, The retry wrapper _read_file_remotely_elements_with_retries
currently reuses the same file_handle across attempts causing requests.post to
resend an EOF/partial stream; before each retry attempt rewind the stream (call
file_handle.seek(0)) or, if the handle is not seekable, buffer the contents
(e.g., into a BytesIO) and use that buffer for each attempt so the uploaded file
is complete on retries; apply the same change to the other retry wrapper that
delegates to the remote read (the similar retry function for the
chunked/alternative remote-read path) so both retry paths reset or rebuffer the
file_handle before each request attempt.
There was a problem hiding this comment.
This is a pre-existing pattern — the existing _read_file_remotely method (line 361) has the same behavior and doesn't add seek(0) before the request either. The new _read_file_remotely_elements method follows the same pattern for consistency. Addressing the retry seek behavior for both methods would be a separate improvement outside the scope of this PR.
…_files in test Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (1)
346-359:⚠️ Potential issue | 🟠 MajorCould we rewind or rebuffer the upload stream before each retry, wdyt?
This wrapper retries with the same
file_handle.requests.post(..., files=...)consumes that stream, so after the first transient failure the next attempt can resend EOF/partial content and make recovery impossible on the new remote path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py` around lines 346 - 359, The wrapper _read_file_remotely_elements_with_retries reuses the same file_handle across retries but does not rewind or rebuffer it, so subsequent requests may send EOF/partial data after the first attempt; before each retry call to _read_file_remotely_elements you should reset the upload stream: if file_handle is seekable call file_handle.seek(0), otherwise replace it with a fresh in-memory or temporary buffered stream (e.g., read into BytesIO or tempfile and use that) so each requests.post(..., files=...) sees the full content; ensure this rewind/buffering happens before every retry attempt in _read_file_remotely_elements_with_retries.
🧹 Nitpick comments (1)
unit_tests/sources/file_based/file_types/test_unstructured_parser.py (1)
809-839: Could we parametrize this overFileType.MDandFileType.TXT, wdyt?The production branch handles both extensions, but this regression test only locks in
.md. Covering.txttoo would better protect the compatibility fix called out in the PR.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/file_based/file_types/test_unstructured_parser.py` around lines 809 - 839, Parametrize the test_parse_records_markdown_json_md_file test to run for both FileType.MD and FileType.TXT: replace the single-case setup of mock_detect_filetype.return_value = FileType.MD with a pytest.mark.parametrize over a variable like file_type (values FileType.MD and FileType.TXT), set mock_detect_filetype.return_value = file_type, and adjust the fake_file URI extension accordingly (e.g., use ".md" when file_type is FileType.MD and ".txt" when FileType.TXT) so UnstructuredParser.parse_records is exercised for both extensions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py`:
- Around line 413-414: The code currently returns response.json() (assigned to
json_response) without verifying its type; update the logic after calling
response.json() in unstructured_parser.py to assert the result is a list (use
isinstance(json_response, list)), and if it's not a list raise a descriptive
exception (e.g., ValueError or RuntimeError) that includes the unexpected
response content (or response.text) so callers see the mismatch instead of
silently receiving a dict; ensure the function no longer returns non-list values
and references the json_response variable and the response.json() call in the
error path.
---
Duplicate comments:
In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py`:
- Around line 346-359: The wrapper _read_file_remotely_elements_with_retries
reuses the same file_handle across retries but does not rewind or rebuffer it,
so subsequent requests may send EOF/partial data after the first attempt; before
each retry call to _read_file_remotely_elements you should reset the upload
stream: if file_handle is seekable call file_handle.seek(0), otherwise replace
it with a fresh in-memory or temporary buffered stream (e.g., read into BytesIO
or tempfile and use that) so each requests.post(..., files=...) sees the full
content; ensure this rewind/buffering happens before every retry attempt in
_read_file_remotely_elements_with_retries.
---
Nitpick comments:
In `@unit_tests/sources/file_based/file_types/test_unstructured_parser.py`:
- Around line 809-839: Parametrize the test_parse_records_markdown_json_md_file
test to run for both FileType.MD and FileType.TXT: replace the single-case setup
of mock_detect_filetype.return_value = FileType.MD with a
pytest.mark.parametrize over a variable like file_type (values FileType.MD and
FileType.TXT), set mock_detect_filetype.return_value = file_type, and adjust the
fake_file URI extension accordingly (e.g., use ".md" when file_type is
FileType.MD and ".txt" when FileType.TXT) so UnstructuredParser.parse_records is
exercised for both extensions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 06a83371-f490-4862-a339-141b4f6b827f
📒 Files selected for processing (2)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.pyunit_tests/sources/file_based/file_types/test_unstructured_parser.py
| json_response: List[Dict[str, Any]] = response.json() | ||
| return json_response |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /tmp && find . -name "unstructured_parser.py" 2>/dev/null | head -5Repository: airbytehq/airbyte-python-cdk
Length of output: 54
🏁 Script executed:
# Let's search for the file in the repository
find . -name "unstructured_parser.py" 2>/dev/nullRepository: airbytehq/airbyte-python-cdk
Length of output: 138
🏁 Script executed:
# Get context around lines 413-414
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '400,430p'Repository: airbytehq/airbyte-python-cdk
Length of output: 1487
🏁 Script executed:
# Let's also check if there's validation elsewhere and understand the full method
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '380,440p'Repository: airbytehq/airbyte-python-cdk
Length of output: 2964
🏁 Script executed:
# Search for markdown_json function and its usage
rg "markdown_json" --type py -B 5 -A 10Repository: airbytehq/airbyte-python-cdk
Length of output: 21328
🏁 Script executed:
# Check what happens to the returned value from _read_file_remotely_elements
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,330p'Repository: airbytehq/airbyte-python-cdk
Length of output: 2039
🏁 Script executed:
# Check the _render_markdown method to see how it handles the data
rg "_render_markdown" -B 2 -A 10 airbyte_cdk/sources/file_based/file_types/unstructured_parser.pyRepository: airbytehq/airbyte-python-cdk
Length of output: 2755
🏁 Script executed:
# Let's see the complete parse_records flow
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,350p'Repository: airbytehq/airbyte-python-cdk
Length of output: 2956
🏁 Script executed:
# Check if there are other places where response.json() is called without validation
rg "response\.json\(\)" airbyte_cdk/sources/file_based/file_types/unstructured_parser.py -B 3 -A 3Repository: airbytehq/airbyte-python-cdk
Length of output: 899
🏁 Script executed:
# Let's also check what happens if json.dumps is called on a dict vs a list
python3 << 'EOF'
import json
# Test what happens with dict
dict_response = {"detail": "error"}
list_response = [{"type": "Title", "text": "heading"}]
print("json.dumps on dict:")
print(json.dumps(dict_response))
print("\njson.dumps on list:")
print(json.dumps(list_response))
# What happens when we iterate?
print("\n\nIterating over dict:")
for item in dict_response:
print(f" {item}")
print("\nIterating over list:")
for item in list_response:
print(f" {item}")
EOFRepository: airbytehq/airbyte-python-cdk
Length of output: 264
Validate that the API response is actually a list before returning it.
Right now if the API returns a 200 with {"detail": "..."} instead of an array, the code silently emits a JSON object even though the schema promises a JSON array of elements. Since json.dumps() works on both dicts and lists, the error goes undetected downstream. Could we add a check that response.json() returns a list, wdyt?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py` around
lines 413 - 414, The code currently returns response.json() (assigned to
json_response) without verifying its type; update the logic after calling
response.json() in unstructured_parser.py to assert the result is a list (use
isinstance(json_response, list)), and if it's not a list raise a descriptive
exception (e.g., ValueError or RuntimeError) that includes the unexpected
response content (or response.text) so callers see the mismatch instead of
silently receiving a dict; ensure the function no longer returns non-list values
and references the json_response variable and the response.json() call in the
error path.
…red format Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
…r wrapping Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
…edText in test Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
feat: add markdown_json output format for unstructured document parsing
Summary
Adds a new
output_formatconfig option toUnstructuredFormatwith two choices:markdown_text(default): existing behavior, renders documents as flat Markdown textmarkdown_json: outputs a JSON-serialized array of structured elements preserving type, text, and metadata fields from the unstructured parsing engineThis is an additive, non-breaking change. All existing connections continue to use
markdown_textby default. The feature applies to all file-based connectors (S3, Azure Blob, GCS, SharePoint, SFTP, etc.) since they share the CDK's unstructured parser.Files changed:
unstructured_format.py— newoutput_formatfield on the config modelunstructured_parser.py— new element-returning methods (_read_file_locally_elements,_read_file_remotely_elements,_read_file_remotely_elements_with_retries);_read_filenow routes through these and conditionally callsjson.dumpsor_render_markdowntest_unstructured_parser.py— three new test cases covering schema inference, local parsing, and remote API parsing withmarkdown_jsonReview & Testing Checklist for Human
markdown_textmode:_read_filenow calls_read_file_remotely_elements_with_retries(new) instead of_read_file_remotely_with_retries(old) for ALL API calls, not justmarkdown_json. The old_read_file_remotely_with_retries→_read_file_remotelypath is now only used bycheck_config. Verify this doesn't introduce regressions for the default text output.filetype in {FileType.MD, FileType.TXT}, the raw file content is returned as-is, ignoring theoutput_formatsetting. Users selectingmarkdown_jsonfor.mdor.txtfiles will still get plain text, not JSON. Is this the intended behavior or should these files be parsed into JSON elements too?_read_file_remotely_elementsduplicates the HTTP call logic from_read_file_remotely. Consider whether this should be refactored to avoid drift (e.g., have_read_file_remotelycall_read_file_remotely_elementsinternally).Notes
markdown_text.markdown_jsonpath.Devin session | Requested by Ryan Waskewich (@rwask)
Summary by CodeRabbit
New Features
Tests