Skip to content

API Reference

This section is for detailed usage and code documentation.

This page aggregates the auto-generated API documentation for the main classes and functions in Pydantask. The content is rendered from the docstrings in your code via the configured documentation tool (e.g. mkdocstrings).


Orchestrator

DeepAgent

High-level orchestrator that coordinates planning, supervision, execution, and QA across sub‑agents.

from pydantask.agents import DeepAgent

Pydantic AI based DeepAgent that manages sub-agents to achieve complex goals.

Source code in src/pydantask/agents/agent.py
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
class DeepAgent:
    """Pydantic AI based DeepAgent that manages sub-agents to achieve complex goals."""

    def __init__(
        self,
        objective: str,
        model: str | Model = "gpt-5.2",
        seed_plan: Plan | None = None,
        planning_mode: Literal["llm", "fixed", "hybrid"] = "llm",
        critic_agent: Optional[Agent] = None,
        supervisor_agent: Optional[Agent] = None,
        researcher_agent: Optional[Agent] = None,
        max_steps: int = 20,
        set_token_budget: Union[int, None] = None,
        sub_agents: Union[None, list[CapabilityDescription]] = None,
        # default output type for the producer agent, can be set to a default type or custom pydantic model for better structure and validation of final output
        # output_type: Type = TaskResult,
        # planning_mode: str = "dynamic",  # "static" | "dynamic"
        trace: bool = False,
        checkpoint: bool = False,
        checkpoint_dir: Path | str | None = None,
        run_from_checkpoint: bool = False,
        verbose_logging: bool = False,
    ):
        """Initialize a DeepAgent instance.

        Args:
            objective: The overall objective / task the deep agent is working on.
            model: Model identifier or ``pydantic_ai.models.Model`` instance to use
                for all sub-agents. Defaults to ``"gpt-5.2"``.
            seed_plan: Optional pre-defined :class:`~pydantask.models.Plan` to seed
                the initial task DAG. If provided, it is loaded into
                :class:`~pydantask.models.RuntimeState.plan` at the start of
                :meth:`run`.

                Notes:
                * Task IDs are respected and used as keys in ``RuntimeState.plan``.
                * ``RuntimeState.next_task_id`` is set to ``max(task_id) + 1``.
                * Dependencies are validated to ensure they reference existing tasks.
            planning_mode: Controls whether the supervisor is allowed to modify the
                plan at runtime.

                * ``"llm"``: The supervisor may add/patch tasks.
                * ``"hybrid"``: Same as ``"llm"``, but typically used with
                  ``seed_plan`` to provide an initial DAG the supervisor can extend.
                * ``"fixed"``: The supervisor is not given the plan-mutation tools
                  (``add_task``/``patch_task``) and can only execute/transition the
                  existing tasks.
            critic_agent: Optional pre-configured critic ``Agent``. If omitted, a
                default critic agent is created.
            supervisor_agent: Optional supervisor ``Agent`` used to manage the task
                DAG. If omitted, a default dynamic supervisor is created.
            researcher_agent: Optional research ``Agent``. If omitted, a default
                web/doc research agent is created.
            producer_agent: Optional producer ``Agent``. If omitted, a default
                agent is created.
            max_steps: Maximum number of DeepAgent control-loop iterations to run
                before forcing termination.
            set_token_budget: Optional global token budget for the run. Currently
                stored but not strictly enforced.
            sub_agents: Additional ``CapabilityDescription`` objects to register as
                callable sub-agents alongside the built-ins.
            output_type: Pydantic model type used as the default output structure
                for the producer agent.
            trace: If ``True``, auto-configure tracing via the configured backend.
            checkpoint: If ``True``, enable event-sourced checkpoint logging for recovery.
            checkpoint_dir: Optional directory to reuse for checkpoints when resuming a run.
                If omitted, a unique directory under ``_checkpoint/`` is created.
            verbose_logging: If ``True``, log richer debugging information during
                execution.
        """

        if trace:
            init_tracing_backend(autodetect_tracing_backend())

        # `model` can be either:
        #   - a pydantic_ai Model instance (fully custom)
        #   - a bare model name (defaults to OpenAI), e.g. "gpt-4.1-mini"
        #   - a provider-prefixed string, e.g. "openai:gpt-4.1-mini" or "anthropic:claude-sonnet-4-5"
        self.model_name: str = (
            model if isinstance(model, str) else model.__class__.__name__
        )

        if objective is None:
            raise TypeError("DeepAgent requires 'objective' to be provided")

        if planning_mode in {"fixed", "hybrid"} and seed_plan is None:
            raise ValueError(
                "seed_plan must be provided when planning_mode is 'fixed' or 'hybrid'"
            )

        self.objective: str = objective
        self._max_steps: int = max_steps  # Max steps to prevent infinite loops
        self.token_budget: Union[int, None] = set_token_budget
        self.verbose = verbose_logging
        # self.output_type = output_type
        self.planning_mode = planning_mode
        self.seed_plan: Union[Plan, None] = seed_plan
        self._retry_client = self._create_retrying_client()

        # Checkpointing / resume semantics:
        # - `checkpoint=True` enables writing events.
        # - `checkpoint_dir=...` forces checkpointing on and chooses the directory.
        # - `run_from_checkpoint=True` requires `checkpoint_dir` and will replay
        #   events from that directory on `run()`.
        if run_from_checkpoint and checkpoint_dir is None:
            raise ValueError(
                "checkpoint_dir must be provided when run_from_checkpoint=True"
            )

        if checkpoint_dir is not None or run_from_checkpoint:
            checkpoint = True

        self.checkpoint = checkpoint
        self.run_from_checkpoint = run_from_checkpoint

        # Concurrency guardrails:
        # - `_plan_lock` protects plan-level mutations and task claiming (READY->RUNNING).
        self._plan_lock = asyncio.Lock()

        self.checkpoint_path: Path | None = None
        self._checkpoint_recorder: CheckpointRecorder | None = None
        if self.checkpoint:
            self.checkpoint_path = (
                Path(checkpoint_dir)
                if checkpoint_dir is not None
                else Path(f"_checkpoint/{uuid.uuid4()}/")
            )
            self.checkpoint_path.mkdir(parents=True, exist_ok=True)
            self._checkpoint_recorder = CheckpointRecorder(self.checkpoint_path)

        # Build the shared model used by all sub-agents.
        # TODO: Future state allow for configuration of what models to use per capability
        # We inject the retrying httpx client into the provider for durability.
        self._retry_model = self._build_model(model)

        # NOTE: Filesystem tools exist in `pydantask.tools.default_tools`, but are not
        # enabled by default. The harness is currently in-memory focused.
        self._critic_agent = critic_agent or Agent(
            model=self._retry_model,
            name="_default_Critic_Agent",
            system_prompt=CRITIC_SYS_PROMPT,
            output_type=TaskQAResult,
            deps_type=RuntimeState,
            tools=[get_current_datetime, think_tool],
            # end_strategy="exhaustive",
        )

        self._supervisor_agent = supervisor_agent or Agent(
            model=self._retry_model,
            name="_dynamic_Supervisor_Agent",
            system_prompt=DYNAMIC_SUPERVISOR_SYS_PROMPT,
            output_type=SupervisorDecision,
            deps_type=RuntimeState,
            tools=self._default_supervisor_tools(),
            end_strategy="exhaustive",
        )

        # TODO: rework some of these tools
        tavily_api_key = os.getenv("TAVILY_API_KEY", None)

        _defautl_research_tool_set = [
            think_tool,
            append_scratch_note,
            read_scratch_notes,
            get_current_datetime,
        ]

        if not tavily_api_key:
            logger.info(
                "Tavily api key not found. Defaulting to built in Duck Duck Go search tool."
            )
            _defautl_research_tool_set.append(duckduckgo_search_tool())
        else:
            _defautl_research_tool_set.append(tavily_search_tool(tavily_api_key))

        self._researcher_agent = researcher_agent or Agent(
            model=self._retry_model,
            name="_default_Research_Agent", 
            system_prompt=RESEARCH_AGENT_SYS_PROMPT,
            tools=_defautl_research_tool_set,
            deps_type=TaskRunDeps,
            output_type=TaskResult,
        )

        self._capability_registry = self._setup_capabilities(
            additonal_capabilities=sub_agents
        )

        # Scheduler/system notes injected into the next supervisor prompt.
        self._last_scheduler_report: str = ""

    async def aclose(self) -> None:
        """Close underlying resources used by this ``DeepAgent`` instance.

        This is primarily responsible for flushing any tracing backends and
        closing the shared async HTTP client used by the model providers.
        Safe to call multiple times.
        """
        try:
            # best-effort; safe to call even if tracing is disabled
            flush_tracing()
        finally:
            if getattr(self, "_retry_client", None) is not None:
                await self._retry_client.aclose()

    async def __aenter__(self) -> "DeepAgent":
        """Enter the async context manager and return this ``DeepAgent``.

        Allows ``async with DeepAgent(...) as agent: ...`` usage.
        """
        return self

    async def __aexit__(self, exc_type, exc, tb) -> None:
        """Exit the async context manager, ensuring resources are cleaned up."""
        await self.aclose()

    def _default_supervisor_tools(self) -> list[Callable]:
        """Return the set of tools exposed to the supervisor agent.

        The selected toolset depends on :attr:`planning_mode`:

        * ``fixed``: supervisor can update/cancel tasks and view QA reports, but
          cannot add or patch tasks.
        * ``llm`` / ``hybrid``: supervisor can also add and patch tasks.

        Note: This is enforced by tool registration (not just prompting), so in
        ``fixed`` mode the supervisor LLM cannot call ``add_task``/``patch_task``.
        """
        base_tools = [
            self.update_task_status,
            self.cancel_task,
            self.view_qa_report,
            get_current_datetime,
            think_tool,
        ]

        mutating_tools = [
            self.add_task,
            self.patch_task,
            self.mark_final_task,
        ]

        if self.planning_mode == "fixed":
            return base_tools

        return base_tools + mutating_tools

    def _build_model(self, model: str | Model) -> Model:
        """Construct a ``pydantic_ai`` model, wiring in the shared HTTP client.

        The ``model`` parameter may be either:

        * A bare model name (e.g. ``"gpt-4.1-mini"``) which defaults to the
          OpenAI provider.
        * A provider-prefixed string such as ``"openai:gpt-4.1-mini"`` or
          ``"anthropic:claude-sonnet-4-5"``.
        * An already-instantiated ``pydantic_ai.models.Model`` instance, which is
          returned unchanged.
        """
        if isinstance(model, Model):
            return model

        provider_name: str
        model_name: str
        if ":" in model:
            provider_name, model_name = model.split(":", 1)
            provider_name = provider_name.strip().lower()
            model_name = model_name.strip()
        else:
            provider_name, model_name = "openai", model

        if provider_name in {"openai", "openai_compat", "openrouter"}:
            # NOTE: "openrouter" here assumes OpenAI-compatible API. If you want true
            # OpenRouter defaults (headers/routing), we may want OpenRouterProvider. Dunno
            return OpenAIChatModel(
                model_name, provider=OpenAIProvider(http_client=self._retry_client)
            )

        if provider_name == "anthropic":
            return AnthropicModel(
                model_name,
                provider=AnthropicProvider(http_client=self._retry_client),
            )

        raise ValueError(
            f"Unsupported model provider prefix: {provider_name!r}. "
            "Use e.g. 'openai:...' or 'anthropic:...' or pass a Model instance."
        )

    def _create_retrying_client(self):
        """Create an ``httpx.AsyncClient`` with robust retry behaviour.

        The returned client uses ``AsyncTenacityTransport`` with sensible
        defaults for rate limits and transient network failures. See
        https://ai.pydantic.dev/retries/ for more details.
        """

        def should_retry_status(response):
            """Raise exceptions for retryable HTTP status codes."""
            if response.status_code in (429, 502, 503, 504):
                response.raise_for_status()  # This will raise HTTPStatusError

        transport = AsyncTenacityTransport(
            config=RetryConfig(
                # Retry on HTTP errors and connection issues
                retry=retry_if_exception_type((HTTPStatusError, ConnectionError)),
                # Smart waiting: respects Retry-After headers, falls back to exponential backoff
                wait=wait_retry_after(
                    fallback_strategy=wait_exponential(multiplier=1, max=60),
                    max_wait=300,
                ),
                # Stop after 5 attempts
                stop=stop_after_attempt(5),
                # Re-raise the last exception if all retries fail
                reraise=True,
            ),
            validate_response=should_retry_status,
        )

        return AsyncClient(transport=transport)

    def _setup_capabilities(
        self, additonal_capabilities: Union[None, list[CapabilityDescription]] = None
    ) -> Dict:
        """Create the default sub-agent capability registry.

        This wires up the built-in producer, researcher, and general worker
        agents, and optionally merges any extra ``CapabilityDescription``
        instances supplied by the caller.

        Args:
            additonal_capabilities: Additional capabilities to register on top of
                the built-in sub-agents.

        Returns:
            Dict[str, CapabilityDescription]: Mapping from capability name to
            its description and callable agent/tool.
        """

        producer_agent = Agent(
            model=self._retry_model,
            name="_default_Producer_agent",
            system_prompt=PRODUCER_SYS_PROMPT,
            deps_type=TaskRunDeps,
            output_type=TaskResult,
            tools=[
                # Plan / history inspection
                list_completed_tasks,
                get_task_result,
                # Reflection
                think_tool,
            ],
        )

        producer = CapabilityDescription(
            name="producer_agent",
            description="Produces output based on information from various sources and sub agents.",
            tool_func=as_runner(producer_agent),
        )

        researcher = CapabilityDescription(
            name="research_agent",
            description="Tool to research information. This could include searching the web or querying a data source.",
            tool_func=as_runner(self._researcher_agent),
        )

        general_worker_agent = Agent(
            model=self._retry_model,
            name="_default_General_Worker_Agent",
            system_prompt=WORKER_AGENT_SYS_PROMPT,
            deps_type=TaskRunDeps,
            output_type=TaskResult,
            tools=[
                # list_documents,
                list_completed_tasks,
                get_task_result,
                think_tool,
                append_scratch_note,
                read_scratch_notes,
                get_current_datetime,
            ],
        )

        gen_worker = CapabilityDescription(
            name="worker_agent",
            description=(
                "General-purpose worker for analysis, summarization, document editing, "
                "code or log interpretation, and other non-research tasks that operate on "
                "existing context."
            ),
            tool_func=as_runner(general_worker_agent),
        )

        _capabilities_list = [producer, researcher, gen_worker]

        # if additional sub agents been supplied then add those to the registry
        if additonal_capabilities:
            _capabilities_list.extend(additonal_capabilities)

        _capability_registry = {
            sub_agent.name: sub_agent for sub_agent in _capabilities_list
        }
        # each agent gets its own unique id
        return _capability_registry


    def _initialize_runtime_state(self, objective: str, registry: dict) -> RuntimeState:
        """Create the initial :class:`RuntimeState` for a new DeepAgent run.

        This initializes an empty plan. If ``seed_plan`` was provided when the
        DeepAgent was constructed, it is applied at the start of :meth:`run`.

        Args:
            objective: Top-level objective for this DeepAgent execution.
            registry: Mapping of capability names to ``CapabilityDescription``
                instances.

        Returns:
            A freshly initialized ``RuntimeState`` with an empty plan and
            ``next_task_id`` set to ``1``.
        """
        runtime_state = RuntimeState(
            objective=objective, capability_registry=registry, next_task_id=1
        )
        runtime_state.checkpoint_recorder = self._checkpoint_recorder
        return runtime_state

    def _checkpoint_state(self, runtime: RuntimeState):
        """Persist a lightweight runtime summary when checkpointing is enabled."""
        if not self._checkpoint_recorder:
            return

        summary = {
            "ts": datetime.utcnow().isoformat(),
            "runtime_steps": runtime.runtime_steps,
            "total_tasks": len(runtime.plan),
            "status_counts": dict(
                Counter(t.status.value for t in runtime.plan.values())
            ),
            "next_task_id": runtime.next_task_id,
        }
        self._checkpoint_recorder.record_summary(summary)

    def _replay_checkpoint(self, runtime_state: RuntimeState) -> None:
        if not self._checkpoint_recorder:
            return

        events = self._checkpoint_recorder.load_events()
        if not events:
            return

        for event in events:
            self._apply_event(runtime_state, event)

        if runtime_state.plan:
            max_existing = max(runtime_state.plan.keys()) + 1
            runtime_state.next_task_id = max(runtime_state.next_task_id, max_existing)

    def _apply_event(self, runtime_state: RuntimeState, event: CheckpointEvent) -> None:
        payload = event.payload or {}
        event_type = event.type

        if event_type == "task_added":
            task_data = payload.get("task")
            if not task_data:
                return
            task = TaskItem(**task_data)
            runtime_state.plan[task.task_id] = task
            runtime_state.next_task_id = max(
                runtime_state.next_task_id,
                payload.get("next_task_id", task.task_id + 1),
            )
            return

        if event_type == "task_patched":
            task_id = payload.get("task_id")
            if task_id is None or task_id not in runtime_state.plan:
                return
            task = runtime_state.plan[task_id]
            if "sub_task_objective" in payload:
                task.sub_task_objective = payload["sub_task_objective"]
            if "dependencies" in payload:
                task.sub_task_dependencies = payload["dependencies"]
            if "is_final" in payload:
                task.is_final = bool(payload["is_final"])
            return

        if event_type == "final_task_set":
            task_id = payload.get("task_id")
            if task_id is None:
                return

            # Enforce the invariant: at most one task is marked final.
            for t in runtime_state.plan.values():
                t.is_final = False

            if task_id in runtime_state.plan:
                runtime_state.plan[task_id].is_final = True
            return

        if event_type == "task_status_updated":
            task_id = payload.get("task_id")
            if task_id is None or task_id not in runtime_state.plan:
                return
            task = runtime_state.plan[task_id]
            status_value = payload.get("status")
            if status_value is not None:
                task.status = TaskStatus(status_value)
            if "error_msg" in payload:
                task.error_msg = payload.get("error_msg")
            reason = payload.get("reason")
            if reason:
                history = task.metadata.setdefault("status_history", [])
                if isinstance(history, list):
                    history.append(
                        {
                            "ts": event.ts.isoformat(),
                            "status": task.status.value,
                            "reason": reason,
                        }
                    )
            return

        if event_type == "task_result":
            task_id = payload.get("task_id")
            result_payload = payload.get("result")
            if (
                task_id is None
                or result_payload is None
                or task_id not in runtime_state.plan
            ):
                return

            # If the event references a sidecar file, prefer that full payload.
            full_path = payload.get("full_result_path")
            if isinstance(full_path, str) and full_path:
                loaded = self._load_full_task_result_payload(full_path)
                if loaded is not None:
                    result_payload = loaded

            runtime_state.plan[task_id].result = TaskResult(**result_payload)
            return

        if event_type == "task_metadata_appended":
            task_id = payload.get("task_id")
            key = payload.get("key")
            value = payload.get("value")
            if task_id is None or key is None or task_id not in runtime_state.plan:
                return
            task = runtime_state.plan[task_id]
            existing = task.metadata.get(key)
            if existing is None:
                task.metadata[key] = value
            elif isinstance(existing, list):
                existing.append(value)
            elif isinstance(existing, str):
                task.metadata[key] = existing + f"\n\n{value}"
            else:
                task.metadata[key] = value
            return

        if event_type == "scratch_note_appended":
            task_id = payload.get("task_id")
            if task_id is None or task_id not in runtime_state.plan:
                return
            note = payload.get("note", "")
            key = "scratch_notes"
            existing = runtime_state.plan[task_id].metadata.get(key, "")
            runtime_state.plan[task_id].metadata[key] = existing + f"\n\n{note}"
            return

        if event_type == "critic_feedback":
            task_id = payload.get("task_id")
            if task_id is None or task_id not in runtime_state.plan:
                return
            feedback_payload = payload.get("feedback")
            if feedback_payload is not None:
                runtime_state.plan[task_id].task_feedback = TaskQAResult(
                    **feedback_payload
                )
            if "attempt_count" in payload:
                runtime_state.plan[task_id].attempt_count = payload["attempt_count"]
            return

        # supervisor_decision and other audit events do not mutate state on replay.

    def _record_event(
        self, event_type: CheckpointEventType, payload: Dict[str, Any]
    ) -> None:
        if self._checkpoint_recorder:
            self._checkpoint_recorder.record(event_type, payload)

    def _record_task_status_event(
        self,
        task_id: int,
        status: TaskStatus,
        *,
        reason: str | None = None,
        error_msg: str | None = None,
    ) -> None:
        payload: Dict[str, Any] = {"task_id": task_id, "status": status.value}
        if reason:
            payload["reason"] = reason
        if error_msg:
            payload["error_msg"] = error_msg
        self._record_event("task_status_updated", payload)

    def _persist_full_task_result_payload(
        self, task_id: int, result_payload: Dict[str, Any]
    ) -> str | None:
        """Persist the full TaskResult payload under the checkpoint directory.

        Returns a *relative* path (from checkpoint root) that can be stored in
        the event log, or ``None`` if persistence is unavailable.
        """
        if self.checkpoint_path is None:
            return None

        artifacts_dir = self.checkpoint_path / TASK_RESULT_ARTIFACT_DIRNAME
        artifacts_dir.mkdir(parents=True, exist_ok=True)

        relpath = f"{TASK_RESULT_ARTIFACT_DIRNAME}/task_{task_id}.json"
        path = self.checkpoint_path / relpath
        with path.open("w", encoding="utf-8") as fh:
            json.dump(result_payload, fh, ensure_ascii=False)

        return relpath

    def _load_full_task_result_payload(self, relpath: str) -> Dict[str, Any] | None:
        """Load a full TaskResult payload previously persisted by this agent."""
        if self.checkpoint_path is None:
            return None

        path = self.checkpoint_path / relpath
        if not path.exists():
            return None

        try:
            with path.open("r", encoding="utf-8") as fh:
                data = json.load(fh)
            if isinstance(data, dict):
                return data
        except Exception:
            return None

        return None

    def _record_task_result(self, task: TaskItem) -> None:
        if not self._checkpoint_recorder or not task.result:
            return

        # Use JSON mode so datetimes (e.g. SourceRef.accessed_at) are serializable.
        result_payload: Dict[str, Any] = task.result.model_dump(mode="json")

        full_result_path: str | None = None
        detailed_output = (result_payload.get("detailed_output") or "")
        if detailed_output and len(detailed_output) > EVENT_RESULT_DETAIL_TRUNCATION:
            # Persist the full payload to a sidecar file so replay can restore it.
            full_result_path = self._persist_full_task_result_payload(
                task.task_id, result_payload
            )

            truncation_notice = (
                f"\n\n...[TRUNCATED {len(detailed_output) - EVENT_RESULT_DETAIL_TRUNCATION} chars]..."
            )
            result_payload["detailed_output"] = (
                detailed_output[:EVENT_RESULT_DETAIL_TRUNCATION] + truncation_notice
            )

        payload: Dict[str, Any] = {"task_id": task.task_id, "result": result_payload}
        if full_result_path:
            payload["full_result_path"] = full_result_path

        self._record_event("task_result", payload)

    def _record_metadata_append(self, task_id: int, key: str, value: Any) -> None:
        if not self._checkpoint_recorder:
            return
        self._record_event(
            "task_metadata_appended", {"task_id": task_id, "key": key, "value": value}
        )

    def _format_capabilities(self) -> str:
        """Format all registered capabilities into a planner-friendly string.

        Each line is of the form: ``- <capability_name>: <description>``.
        """
        lines = []
        for name, desc in self._capability_registry.items():
            description = getattr(desc, "description", "")
            lines.append(f"- {name}: {description}")
        return "\n".join(lines)

    def _format_plan(self, plan: Plan):
        """Format a :class:`Plan` instance into a human-readable multi-line string."""
        lines = []
        for task in plan.tasks:
            id = task.task_id
            sub_task_obj = task.sub_task_objective
            task_status = task.status
            metadata = task.metadata
            lines.append(
                f"- Task ID:{id}\n sub_task_obj: {sub_task_obj} \n task_status: {task_status}\n metadata: {metadata}"
            )
        return "\n".join(lines)

    def _format_supervisor_input_prompt(self, ctx: RuntimeState) -> str:
        """Build the composite prompt passed to the supervisor agent.

        The prompt includes the overall objective, a summarized status board of
        all tasks in the plan, and a list of available capabilities.
        """
        # Pre-format the plan to ensure the LLM sees a clean "Status Board"
        capability_display = self._format_capabilities()

        plan_display_lines = []
        for t in ctx.plan.values():
            line = (
                f"- Task ID: {t.task_id} | Status: [{t.status.value}] "
                f"| Final: {getattr(t, 'is_final', False)} "
                f"| Objective: {t.sub_task_objective} "
                f"| Dependencies: {t.sub_task_dependencies}"
            )

            fb = getattr(t, "task_feedback", None)
            if fb is not None:
                # Adjust these fields to match TaskQAResult
                # verdict = getattr(fb, "passed", None)
                verdict = getattr(fb, "passed", None)
                summary = getattr(fb, "reasoning", None)

                line += "\n  QA: "
                if verdict is not None:
                    line += f"verdict={verdict} "
                if summary:
                    line += f"\n    summary: {summary}"

            plan_display_lines.append(line)

        plan_display = "\n".join(plan_display_lines)

        prompt = SUPERVISOR_INPUT_PROMPT.format(
            objective=ctx.objective,
            plan_display=plan_display,
            agent_display=capability_display,
            now=datetime.now(),
            current_year=datetime.now().year,
        )

        if self._last_scheduler_report:
            prompt += (
                "\n\n### SYSTEM SCHEDULER NOTES (deterministic)\n"
                + self._last_scheduler_report.strip()
            )

        return prompt

    def _format_critic_input_prompt(self, task: TaskItem, ctx: RuntimeState) -> str:
        """Construct the evaluation prompt sent to the critic agent.

        The critic receives the overall objective, the ``TaskItem`` definition
        it should be evaluating, the worker's structured ``TaskResult`` (if any),
        and any relevant in-memory documents from the runtime state.

        Note: this harness is currently in-memory focused; do not assume any
        filesystem persistence.
        """
        worker_output = task.result.model_dump_json(indent=2) if task.result else "null"

        _prompt = f"""

            Evaluate if the following worker output completed the specified task it was given.

            Overall Objective:
            {ctx.objective}

            Sub Task Definition (TaskItem):
            {task.model_dump_json(indent=2)}

            Worker Output (TaskResult):
            {worker_output}

            In-memory documents / scratchpads:
            {ctx.document_store}

            """
        return _prompt

    def _is_context_limit_error(self, exc: Exception) -> bool:
        """Heuristic detection of "context length exceeded" errors.

        Different providers/local gateways surface these differently (OpenAI-style
        400s, Anthropic "prompt too long", llama.cpp "context overflow", etc.).
        """
        msg = str(exc).lower()
        needles = [
            "context length",
            "maximum context",
            "max context",
            "prompt is too long",
            "too many tokens",
            "context overflow",
            "exceeds the context",
            "token limit",
        ]
        if any(n in msg for n in needles):
            return True

        if isinstance(exc, HTTPStatusError):
            # Common for OpenAI-compatible APIs.
            try:
                data = exc.response.json()
            except Exception:
                data = None

            if exc.response.status_code in (400, 413):
                # 413 can happen on some proxies when payload is too large.
                if data and isinstance(data, dict):
                    err = data.get("error") or {}
                    code = (err.get("code") or "").lower()
                    emsg = (err.get("message") or "").lower()
                    if "context" in code or "context" in emsg:
                        return True

        return False

    def _build_resume_prompt(self, step: TaskItem, error: Exception) -> str:
        """Build a minimal resume prompt after a context overflow.

        We intentionally keep this short; the sub-agent should reconstruct its
        progress using task metadata (scratch notes / checkpoints).
        """
        checkpoint = step.metadata.get("scratch_notes", "")
        checkpoint_preview = checkpoint
        if len(checkpoint_preview) > 6_000:
            checkpoint_preview = (
                checkpoint_preview[:6_000] + "\n...[checkpoint truncated]..."
            )

        return f"""
A previous attempt to execute this task failed due to context/window limits.

Task:
- task_id: {step.task_id}
- capability: {step.capability}
- sub_task_objective: {step.sub_task_objective}

Overall objective:
{self.objective}

Checkpoint / scratch notes saved so far (authoritative):
{checkpoint_preview if checkpoint_preview else '<none>'}

Recovery instructions (IMPORTANT):
- Continue the task from the checkpoint above.
- Keep responses concise. Avoid pasting large blobs.
- If you need prior task outputs, call `get_task_result(task_id=..., max_chars=6000)` (or smaller).
- After each major step, call `append_scratch_note(task_id={step.task_id}, note=...)` with a short checkpoint:
  "what I did" + "what I will do next" + "open questions".
- If you feel you're approaching the context limit again, STOP calling tools and output the best possible `TaskResult`.

Error that triggered recovery (for debugging only):
{str(error)}
"""

    async def add_task(
        self,
        ctx: RunContext[RuntimeState],
        sub_task_objective: str,
        capability: str,
        dependencies: list[int] | None = None,
        metadata: dict | None = None,
    ) -> int:
        """Tool: Add Task.

        Note: In ``planning_mode="fixed"`` this tool is not registered on the
        supervisor agent, but it may still be called directly in Python.

        Create and register a new ``TaskItem`` in the current plan/DAG when
        more work is required to achieve the overall objective.

        The supervisor should specify any upstream dependencies so that
        execution order can be enforced.

        Args:
            ctx: ``RunContext`` carrying the current ``RuntimeState``.
            sub_task_objective: Natural-language objective for the new task.
            capability: Name of the capability / sub-agent that should execute
                this task.
            dependencies: Optional list of task IDs that must complete
                successfully before this task can run.
            metadata: Optional free-form metadata dictionary attached to the task.

        Returns:
            The integer ``task_id`` assigned to the newly created task.
        """
        async with self._plan_lock:
            plan = ctx.deps.plan
            new_id = ctx.deps.next_task_id
            ctx.deps.next_task_id += 1

            task = TaskItem(
                task_id=new_id,
                overall_objective=ctx.deps.objective,
                sub_task_objective=sub_task_objective,
                capability=capability,
                sub_task_dependencies=dependencies or [],
                metadata=metadata or {},
                status=TaskStatus.READY,
            )
            plan[new_id] = task
            self._record_event(
                "task_added",
                {
                    "task": task.model_dump(),
                    "next_task_id": ctx.deps.next_task_id,
                },
            )
            return new_id

    async def cancel_task(
        self, ctx: RunContext[RuntimeState], task_id: int, reason: str
    ):
        """Tool: Cancel Task.

        Mark a task as ``CANCELLED`` when it is no longer relevant or when
        a failure in an upstream dependency makes it impossible to complete.

        Args:
            ctx: ``RunContext`` carrying the current ``RuntimeState``.
            task_id: Identifier of the task to cancel.
            reason: Human-readable explanation for the cancellation.
        """
        async with self._plan_lock:
            if task_id in ctx.deps.plan:
                task = ctx.deps.plan[task_id]
                # Instead of deleting, mark as CANCELLED to keep history
                task.status = TaskStatus.CANCELLED
                task.error_msg = reason
                self._record_task_status_event(
                    task_id,
                    TaskStatus.CANCELLED,
                    reason=reason,
                    error_msg=reason,
                )
                return f"Task {task_id} cancelled. Reason: {reason}"
            return f"Error: Task {task_id} not found."

    async def patch_task(
        self,
        ctx: RunContext[RuntimeState],
        task_id: int,
        sub_task_objective: Optional[str] = None,
        dependencies: Optional[List[int]] = None,
    ):
        """Tool: Patch Task.

        Note: In ``planning_mode="fixed"`` this tool is not registered on the
        supervisor agent, but it may still be called directly in Python.

        Update an existing task's objective and/or dependency list in-place.

        Args:
            ctx: ``RunContext`` carrying the current ``RuntimeState``.
            task_id: Identifier of the task to modify.
            sub_task_objective: New sub-task objective, if changing.
            dependencies: Updated list of dependency IDs, if changing.
        """
        async with self._plan_lock:
            task = ctx.deps.plan.get(task_id)
            if not task:
                return "Task not found."

            payload: Dict[str, Any] = {"task_id": task_id}

            if sub_task_objective:
                task.sub_task_objective = sub_task_objective
                payload["sub_task_objective"] = task.sub_task_objective
            if dependencies is not None:
                task.sub_task_dependencies = dependencies
                payload["dependencies"] = task.sub_task_dependencies

            if len(payload) > 1:
                self._record_event("task_patched", payload)

            return f"Task {task_id} updated successfully."

    async def mark_final_task(
        self,
        ctx: RunContext[RuntimeState],
        task_id: int,
        reason: str | None = None,
    ) -> str:
        """Tool: Mark Final Task.

        Mark exactly one task as the final deliverable for the run.

        This tool should be called by the supervisor (planner/orchestrator), not
        by workers. It enables deterministic "final_result" selection on resume.

        The invariant enforced is: at most one task has `is_final=True`.
        """
        async with self._plan_lock:
            if task_id not in ctx.deps.plan:
                return f"Error: No task with id {task_id} found in plan."

            for t in ctx.deps.plan.values():
                t.is_final = False

            ctx.deps.plan[task_id].is_final = True

            payload: Dict[str, Any] = {"task_id": task_id}
            if reason:
                payload["reason"] = reason
            self._record_event("final_task_set", payload)

            return f"Task {task_id} marked as final."

    def _is_terminal_status(self, status: TaskStatus) -> bool:
        """Return True if a task is in a terminal state.

        Note: ERRORED is intentionally treated as non-terminal; the supervisor
        may still choose to patch the task and rerun it.
        """
        return status in {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED}

    async def _scheduler_pass(self, ctx: RuntimeState) -> str:
        """Deterministic scheduler pass.

        This pass performs small, non-LLM state normalization to improve autonomy:

        - Promote PENDING -> READY when all dependencies are COMPLETED.
        - Demote READY -> PENDING if dependencies are not satisfied (keeps the
          status board honest).
        - Mark tasks with unknown capability as ERRORED (unless terminal).

        Returns a human-readable report injected into the next supervisor prompt.
        """
        changes: list[str] = []
        warnings: list[str] = []

        async with self._plan_lock:
            for task_id, task in sorted(ctx.plan.items(), key=lambda kv: kv[0]):
                # Unknown capability detection.
                if task.capability and task.capability not in self._capability_registry:
                    if not self._is_terminal_status(task.status):
                        if task.status != TaskStatus.ERRORED:
                            changes.append(
                                f"- Task {task_id}: {task.status.value} -> errored (unknown capability: {task.capability!r})"
                            )
                            task.status = TaskStatus.ERRORED
                            task.error_msg = f"Unknown capability: {task.capability!r}"
                            self._record_task_status_event(
                                task_id,
                                TaskStatus.ERRORED,
                                reason="unknown capability",
                                error_msg=task.error_msg,
                            )
                        else:
                            task.error_msg = f"Unknown capability: {task.capability!r}"
                    continue

                # Dependency-based readiness propagation.
                deps_ok = self._dependencies_satisfied(task, ctx)

                if task.status == TaskStatus.PENDING and deps_ok:
                    task.status = TaskStatus.READY
                    changes.append(
                        f"- Task {task_id}: pending -> ready (deps satisfied)"
                    )
                    self._record_task_status_event(
                        task_id,
                        TaskStatus.READY,
                        reason="dependencies_satisfied",
                    )

                # Keep READY tasks honest if deps are not actually satisfied.
                if task.status == TaskStatus.READY and not deps_ok:
                    task.status = TaskStatus.PENDING
                    changes.append(
                        f"- Task {task_id}: ready -> pending (deps not satisfied)"
                    )
                    self._record_task_status_event(
                        task_id,
                        TaskStatus.PENDING,
                        reason="dependencies_not_met",
                    )

        if not changes and not warnings:
            return "No scheduler changes this cycle."

        out: list[str] = []
        if changes:
            out.append("Status normalization:")
            out.extend(changes)
        if warnings:
            out.append("Warnings:")
            out.extend(warnings)
        return "\n".join(out)

    def _select_final_result(self, runtime_state: RuntimeState) -> TaskResult | None:
        """Select the run's final output deterministically.

        Priority:
        1) A COMPLETED task with `is_final=True`.
        2) A COMPLETED task with non-empty `result.detailed_output`.
        3) A COMPLETED `producer_agent` task.
        4) Otherwise the newest COMPLETED task with any result.

        This ensures checkpoint resume returns a stable final deliverable even
        if the supervisor immediately declares completion.
        """
        completed: list[TaskItem] = [
            t
            for t in runtime_state.plan.values()
            if t.status == TaskStatus.COMPLETED and t.result is not None
        ]
        if not completed:
            return None

        finals = [t for t in completed if getattr(t, "is_final", False)]
        if finals:
            return max(finals, key=lambda t: t.task_id).result

        with_detail = [
            t
            for t in completed
            if (t.result and (t.result.detailed_output or "").strip())
        ]
        if with_detail:
            return max(with_detail, key=lambda t: t.task_id).result

        producers = [t for t in completed if t.capability == "producer_agent"]
        if producers:
            return max(producers, key=lambda t: t.task_id).result

        return max(completed, key=lambda t: t.task_id).result

    def _build_deadlock_report(
        self, ctx: RuntimeState, decision: SupervisorDecision | None = None
    ) -> str:
        """Explain why no tasks ran in the current cycle."""

        status_counts = Counter(t.status.value for t in ctx.plan.values())
        runnable: list[int] = []
        blocked: list[str] = []

        for task_id, task in sorted(ctx.plan.items(), key=lambda kv: kv[0]):
            if self._is_terminal_status(task.status):
                continue

            if task.status in {
                TaskStatus.READY,
                TaskStatus.RERUN,
            } and self._dependencies_satisfied(task, ctx):
                runnable.append(task_id)
                continue

            # Compute a human-readable reason.
            if task.capability not in self._capability_registry:
                blocked.append(
                    f"- Task {task_id} [{task.status.value}]: unknown capability {task.capability!r}"
                )
                continue

            if task.sub_task_dependencies:
                missing = [d for d in task.sub_task_dependencies if d not in ctx.plan]
                if missing:
                    blocked.append(
                        f"- Task {task_id} [{task.status.value}]: missing deps {missing}"
                    )
                    continue

                unmet = [
                    d
                    for d in task.sub_task_dependencies
                    if ctx.plan.get(d) is not None
                    and ctx.plan[d].status != TaskStatus.COMPLETED
                ]
                if unmet:
                    blocked.append(
                        f"- Task {task_id} [{task.status.value}]: waiting on deps {unmet}"
                    )
                    continue

            blocked.append(f"- Task {task_id} [{task.status.value}]: not runnable")

        lines: list[str] = []
        lines.append("Deadlock / no-progress report:")
        lines.append(f"- status_counts: {dict(status_counts)}")
        if decision is not None:
            lines.append(f"- supervisor_requested: {decision.tasks_to_execute or []}")
        lines.append(f"- runnable_now: {runnable}")
        if blocked:
            lines.append("- blocked_examples:")
            # keep this short to avoid prompt bloat
            lines.extend(blocked[:12])

        return "\n".join(lines)

    @traced()
    async def run(self) -> DeepAgentRunResult:
        """Run the full DeepAgent control loop until completion or max steps.

        If a ``seed_plan`` was supplied at construction time, it is loaded into the
        runtime state before the supervisor loop begins.

        This method repeatedly:

        * Invokes the supervisor to decide which tasks to execute next.
        * Executes ready tasks in parallel via their associated sub-agents.
        * Sends results to the critic for QA and status updates.
        * Optionally checkpoints state between iterations.

        Returns:
            A ``DeepAgentRunResult`` containing the final output, the full plan,
            and the final ``RuntimeState``.
        """
        runtime_state = self._initialize_runtime_state(
            objective=self.objective, registry=self._capability_registry
        )
        self._apply_seed_plan(runtime_state)
        self._replay_checkpoint(runtime_state)

        errors: list[str] = []
        no_progress_cycles = 0

        step_count = 0
        stop_execution = False
        while step_count < self._max_steps and not stop_execution:

            logger.info(f"\n--- DeepAgent Cycle {step_count} ---")

            # Deterministic scheduler pass to normalize readiness and surface issues.
            self._last_scheduler_report = await self._scheduler_pass(runtime_state)

            current_instruction = BOOTSTRAP_INSTURCT if len(runtime_state.plan) == 0 else ORCHESTRATION_INSTRUCT
            supervisor_response = await self._supervisor_agent.run(
                self._format_supervisor_input_prompt(runtime_state),
                deps=runtime_state,
                instructions=current_instruction
            )
            supervisor_response = supervisor_response.output

            self._record_event(
                "supervisor_decision",
                supervisor_response.model_dump(),
            )

            if supervisor_response.all_tasks_completed:
                # Deterministic guardrail: do not allow "completion" unless the
                # task marked as final is actually COMPLETED.
                final_tasks = [
                    t for t in runtime_state.plan.values() if getattr(t, "is_final", False)
                ]

                completion_ok = True
                reasons: list[str] = []

                if not final_tasks:
                    completion_ok = False
                    reasons.append("no task is marked Final: True")
                elif len(final_tasks) > 1:
                    completion_ok = False
                    reasons.append(
                        f"multiple tasks are marked Final: True ({[t.task_id for t in final_tasks]})"
                    )
                else:
                    ft = final_tasks[0]
                    if ft.status != TaskStatus.COMPLETED:
                        completion_ok = False
                        reasons.append(
                            f"final task {ft.task_id} status is {ft.status.value!r} (expected 'completed')"
                        )
                    if ft.result is None:
                        completion_ok = False
                        reasons.append(f"final task {ft.task_id} has no TaskResult")

                if not completion_ok:
                    msg = (
                        "Supervisor returned all_tasks_completed=True, but completion invariants "
                        f"are not met: {', '.join(reasons)}. Overriding to continue."
                    )
                    logger.warning(msg)
                    self._last_scheduler_report = (
                        self._last_scheduler_report
                        + "\n\nCOMPLETION OVERRIDE (deterministic):\n- "
                        + msg
                    ).strip()

                    # Treat this as a no-progress cycle so we eventually fail-safe.
                    no_progress_cycles += 1
                    if self.checkpoint:
                        self._checkpoint_state(runtime_state)
                    runtime_state.runtime_steps += 1
                    step_count += 1
                    continue

                logger.info(
                    "--- Supervisor declared completion and final task is completed. Ending execution loop. ---"
                )
                stop_execution = True
                break

            logger.info("--- Executing Tasks ---")
            # execute tasks that are ready to run and await responses
            task_results = await self._execute_ready_tasks(
                supervisor_response, runtime_state
            )

            # NOTE: `execute(...)` mutates the canonical TaskItem stored in `runtime_state.plan`
            # in-place (it receives the same object reference). Do NOT overwrite
            # `runtime_state.plan[task_id]` with returned TaskItems here; that can clobber
            # concurrent metadata updates (e.g. scratch notes/checkpoints).

            if len(task_results) == 0:
                # No tasks ran this cycle. This is not necessarily terminal in a
                # dynamic planner: we may be blocked on deps, have errored tasks
                # that need patching, or need the supervisor to add new nodes.
                no_progress_cycles += 1
                deadlock = self._build_deadlock_report(
                    runtime_state, supervisor_response
                )
                self._last_scheduler_report = (
                    self._last_scheduler_report + "\n\n" + deadlock
                ).strip()

                logger.info(
                    f"No executable tasks this cycle (no_progress_cycles={no_progress_cycles}). Continuing."
                )

                # Prevent infinite loops if the supervisor cannot make progress.
                if no_progress_cycles >= 3:
                    msg = (
                        "No progress after 3 consecutive cycles (no tasks executed). "
                        "Stopping to avoid infinite loop. "
                        "See SYSTEM SCHEDULER NOTES in the final cycle for details."
                    )
                    logger.warning(msg)
                    errors.append(msg)
                    stop_execution = True

                if self.checkpoint:
                    self._checkpoint_state(runtime_state)

                runtime_state.runtime_steps += 1
                step_count += 1
                continue

            no_progress_cycles = 0

            logger.info(f"Number of tasks executed: {len(task_results)}")
            # go through responses and evaluate if they have completed the task
            for task_result in task_results or []:
                logger.info(f"--- Evaluating Task Result for {task_result.task_id} ---")

                qa_response = await self._critic_agent.run(
                    self._format_critic_input_prompt(task_result, runtime_state),
                    deps=runtime_state,
                )
                qa_response = qa_response.output
                if self.verbose:
                    logger.info("--- QA Response ---")
                    logger.info(qa_response.model_dump_json(indent=2))

                task = runtime_state.plan[task_result.task_id]

                # deterministic transition based on critic
                self.handle_critic_result(task, qa_response)

            if self.checkpoint:
                self._checkpoint_state(runtime_state)

            runtime_state.runtime_steps += 1
            step_count += 1

        return_result = DeepAgentRunResult(
            objective=self.objective,
            final_result=self._select_final_result(runtime_state),
            plan=runtime_state.plan,
            runtime_state=runtime_state,
            errors=errors,
        )

        return return_result

    def _apply_seed_plan(self, runtime_state: RuntimeState) -> None:
        """Seed ``runtime_state.plan`` from ``self.seed_plan`` (if provided).

        This is used to support user-specified plans. It validates:

        * Unique task IDs.
        * Dependencies refer to existing tasks.

        It also updates ``runtime_state.next_task_id``.
        """
        if self.seed_plan is None:
            return

        tasks = list(self.seed_plan.tasks or [])
        if not tasks:
            return

        plan_dict: dict[int, TaskItem] = {}
        for t in tasks:
            if t.task_id in plan_dict:
                raise ValueError(f"Duplicate task_id in seed_plan: {t.task_id}")
            # Ensure the overall objective is consistent.
            if not getattr(t, "overall_objective", None):
                t.overall_objective = runtime_state.objective
            plan_dict[t.task_id] = t

        for t in plan_dict.values():
            for dep_id in t.sub_task_dependencies or []:
                if dep_id not in plan_dict:
                    raise ValueError(
                        f"seed_plan task {t.task_id} depends on missing task {dep_id}"
                    )

        runtime_state.plan = plan_dict
        runtime_state.next_task_id = max(plan_dict.keys()) + 1

    def _dependencies_satisfied(self, step: TaskItem, ctx: RuntimeState) -> bool:
        """Return ``True`` if all of a task's dependencies are fully satisfied.

        Currently a dependency is considered satisfied only if the dependent
        task exists and is in the ``COMPLETED`` state.
        """
        # Consider a dependency satisfied only if it's COMPLETED (or whatever set you like)
        required_statuses = {TaskStatus.COMPLETED}
        for dep_id in step.sub_task_dependencies or []:
            dep_task = ctx.plan.get(dep_id)
            if dep_task is None:
                # Be conservative: if the dependency is missing, treat it as unsatisfied
                return False
            if dep_task.status not in required_statuses:
                return False
        return True

    async def _cascade_cancellations(self, ctx: RuntimeState):
        """Transitively marks downstream tasks as CANCELLED if they rely
        on an upstream task that has been cancelled.
        """
        async with self._plan_lock:
            changed = True
            while changed:
                changed = False
                for task in ctx.plan.values():
                    # We only care about steps waiting to run or currently eligible
                    if task.status in {TaskStatus.PENDING, TaskStatus.READY}:
                        for dep_id in task.sub_task_dependencies or []:
                            dep_task = ctx.plan.get(dep_id)
                            if dep_task and dep_task.status == TaskStatus.CANCELLED:
                                task.status = TaskStatus.CANCELLED
                                task.error_msg = (
                                    f"Upstream dependency Task {dep_id} was cancelled."
                                )
                                self._record_task_status_event(
                                    task.task_id,
                                    TaskStatus.CANCELLED,
                                    reason=f"Upstream task {dep_id} cancelled; dropping downstream branch.",
                                    error_msg=task.error_msg,
                                )
                                changed = True
                                break

    @traced(capture_input=False)
    async def _execute_ready_tasks(
        self, tasks: SupervisorDecision, ctx: RuntimeState
    ) -> list[TaskItem]:
        """Execute all tasks selected by the supervisor that are ready to run.

        Tasks whose dependencies are satisfied are executed concurrently using
        an ``asyncio.TaskGroup``. The returned list contains the updated
        ``TaskItem`` instances after execution.
        """
        # 1. Clean out the graph first. If the supervisor just canceled something via tool,
        # this ensures children are marked CANCELLED right now.
        await self._cascade_cancellations(ctx)

        # Dedupe while preserving order (supervisor can occasionally emit duplicates).
        requested_ids: list[int] = list(dict.fromkeys(tasks.tasks_to_execute or []))

        # Supervisor might reference missing IDs.
        candidate_steps: list[TaskItem] = [
            ctx.plan[task_id] for task_id in requested_ids if task_id in ctx.plan
        ]

        allowed_statuses = {TaskStatus.READY, TaskStatus.RERUN}

        # Determine which steps are eligible based on status+deps.
        # We'll "claim" them (set RUNNING) under `_plan_lock` below to prevent double-scheduling.
        ready_steps = [
            step
            for step in candidate_steps
            if step.status in allowed_statuses
            and self._dependencies_satisfied(step, ctx)
        ]
        # if no ready steps return empty list
        if len(ready_steps) == 0:
            return []

        if not ready_steps:
            return []

        # 2. Claim tasks (READY/RERUN -> RUNNING) atomically so we don't schedule the same
        # task twice in parallel.
        claimed_steps: list[TaskItem] = []
        async with self._plan_lock:
            for step in ready_steps:
                # step is a reference to ctx.plan[task_id]
                if step.status not in allowed_statuses:
                    continue
                # deps can change while we awaited the lock (other tasks completing); re-check.
                if not self._dependencies_satisfied(step, ctx):
                    continue
                step.status = TaskStatus.RUNNING
                claimed_steps.append(step)
                self._record_task_status_event(
                    step.task_id,
                    TaskStatus.RUNNING,
                    reason="claimed_for_execution",
                )

        if not claimed_steps:
            return []

        # 3. Prepare the concurrent coroutines
        ready_tasks = []
        for step in claimed_steps:
            # get supervisor feedback if any for this task
            if (
                tasks.feedback_to_subagents
                and step.task_id in tasks.feedback_to_subagents
            ):
                if step.parameters is None:
                    # create if None
                    step.parameters = {}
                step.parameters["supervisor_feedback"] = (
                    tasks.feedback_to_subagents.get(step.task_id)
                )

            logger.info(
                f"- {step.task_id}: {step.sub_task_objective} using {step.capability}"
            )
            logger.info(f"  Dependencies: {step.sub_task_dependencies}")
            logger.info(f"  Status: {step.status}")
            logger.info(f"  Result: {step.result}")
            logger.info("\n")

            # grab the tool that the plan or supervisor  decides
            worker = self._capability_registry.get(step.capability)
            if worker:
                # We wrap the agent run in a small wrapper to update the step status after
                ready_tasks.append(self.execute(worker.tool_func, step, ctx))
            else:
                # No such capability; mark errored so supervisor/QA can see what happened.
                step.status = TaskStatus.ERRORED
                step.error_msg = f"Unknown capability: {step.capability!r}"
                self._record_task_status_event(
                    step.task_id,
                    TaskStatus.ERRORED,
                    reason="unknown capability",
                    error_msg=step.error_msg,
                )

        # 4. Execute tasks and return exceptions to notify the supervisor
        logger.info("--- Executing Ready Tasks ---")
        task_results = []
        async with TaskGroup() as tg:
            for task in ready_tasks:
                task_results.append(tg.create_task(task))

        results = [t.result() for t in task_results]
        logger.info("--- All Ready Tasks Completed ---")
        return results

    @traced(run_type="task", capture_input=False)
    @retry(wait=wait_exponential_jitter(), reraise=True, stop=stop_after_attempt(3))
    async def execute(
        self, sub_agent: Agent, step: TaskItem, runtime_state: RuntimeState
    ) -> TaskItem:
        """Execute a sub-agent for a single task and record the result.

        Builds a task-specific prompt (with optional supervisor feedback),
        runs the provided ``sub_agent``, and updates the ``TaskItem`` status
        and result based on success or failure.
        """

        # check to see if there was feedback or additional instructions for the task from the supervisor
        _feedback_for_agent = None
        if isinstance(step.parameters, dict):
            _feedback_for_agent = step.parameters.get("supervisor_feedback")

        if step.capability == "producer_agent":

            user_prompt = f"""
            Overall objective:
            {self.objective}

            You are the final synthesis agent.
            - First, call `list_completed_tasks` to see all completed upstream tasks.
            - For each task that is relevant to the objective (especially research tasks), call `get_task_result(task_id=...)`.
            - THEN, write a single, coherent comparative analysis answering the objective.
            - You MUST explicitly integrate evidence from ALL relevant completed tasks (e.g. Task 1 and Task 2 in this run).
            """

            if _feedback_for_agent:

                user_prompt += f"""

                    Supervisor feedback / additional instructions for this execution:

                    {_feedback_for_agent}
                    """

            user_prompt += """
                    Your job:
                    - Use ONLY the completed sub-task results from this run.
                    - Combine their findings into a single, coherent final answer.
                    - Follow your system prompt instructions for citations and final TaskResult structure.
                    - Do NOT request new research or create new sub-tasks.
                    """
        else:
            user_prompt = f"""
                You are executing TaskItem:

            {step.model_dump_json(indent=2)}

                Overall objective:
                {self.objective}

                """
            if _feedback_for_agent:
                user_prompt += f"""

                Supervisor feedback / additional instructions for this execution:
                {_feedback_for_agent}
                """

            user_prompt += """

            ONLY act on this sub-task and any feedback. Do not re-plan or change the task.
            """
        task_deps = TaskRunDeps(runtime_state=runtime_state, task=step)

        # Help smaller-context models avoid blowing up in a single long tool-run.
        # This doesn't guarantee safety (tool output can still be large), but combined
        # with truncated tool outputs and scratch checkpoints it greatly improves durability.
        user_prompt += f"""

Context-budget note:
- You may be running on a smaller-context model.
- Prefer small tool outputs. When calling tools that can return large text, request truncation.
- Checkpoint progress frequently via `append_scratch_note(task_id={step.task_id}, note=...)`.
"""

        max_resume_attempts = 2
        last_error: Exception | None = None

        for resume_attempt in range(max_resume_attempts + 1):
            tool_call_limit = 20 if resume_attempt == 0 else 10

            try:
                result = await sub_agent.run(
                    user_prompt,
                    deps=task_deps,
                    usage_limits=UsageLimits(tool_calls_limit=tool_call_limit),
                )
                step.result = result.output
                step.status = TaskStatus.NEEDS_REVIEW
                step.error_msg = None
                self._record_task_result(step)
                self._record_task_status_event(step.task_id, TaskStatus.NEEDS_REVIEW)
                return step
            except Exception as e:
                last_error = e
                if (
                    self._is_context_limit_error(e)
                    and resume_attempt < max_resume_attempts
                ):
                    # Record the incident and attempt a "fresh run" using scratch checkpoints.
                    overflow_entry = {
                        "at": datetime.now().isoformat(),
                        "attempt": resume_attempt,
                        "error": str(e),
                    }
                    step.metadata.setdefault("context_overflow", [])
                    step.metadata["context_overflow"].append(overflow_entry)
                    self._record_metadata_append(
                        step.task_id, "context_overflow", overflow_entry
                    )

                    # Build a minimal prompt to continue from checkpoint notes.
                    user_prompt = self._build_resume_prompt(step, e)
                    continue

                step.status = TaskStatus.ERRORED
                step.error_msg = str(e)
                self._record_task_status_event(
                    step.task_id,
                    TaskStatus.ERRORED,
                    error_msg=step.error_msg,
                )
                return step

        # Should be unreachable, but keep a safe fallback.
        step.status = TaskStatus.ERRORED
        step.error_msg = str(last_error) if last_error else "Unknown error"
        self._record_task_status_event(
            step.task_id, TaskStatus.ERRORED, error_msg=step.error_msg
        )
        return step

    async def update_task_status(
        self, ctx: RunContext[RuntimeState], task_id: int, status: TaskStatus
    ):
        """Tool: Update Task Status.

        Primarily used by the supervisor to transition a task between states
        (e.g. to ``READY`` or ``COMPLETED``) once dependencies are met or QA
        has passed.

        Args:
            ctx: ``RunContext`` carrying the current ``RuntimeState``.
            task_id: Identifier of the task to update.
            status: New :class:`TaskStatus` value for the task.
        """
        async with self._plan_lock:
            if task_id in ctx.deps.plan:
                task = ctx.deps.plan.get(task_id)
                if task is not None:
                    task.status = status
                    self._record_task_status_event(task_id, status)
                return f"Status for {task_id} is now {status}."
            return f"Error: No task with {task_id} found in plan. Be sure task_id actually exists."

    def handle_critic_result(self, task: TaskItem, review: TaskQAResult):
        """Apply the critic's QA result to a task and emit checkpoint events."""
        task.attempt_count += 1
        task.task_feedback = review

        self._record_event(
            "critic_feedback",
            {
                "task_id": task.task_id,
                "feedback": review.model_dump(),
                "attempt_count": task.attempt_count,
            },
        )

        if review.passed:
            task.status = TaskStatus.COMPLETED
            task.error_msg = None
            self._record_task_status_event(task.task_id, task.status)
            return

        if task.attempt_count >= task.max_attempts:
            task.status = TaskStatus.FAILED
            task.error_msg = (
                f"Max retries reached ({task.attempt_count}/{task.max_attempts})."
            )
            self._record_task_status_event(
                task.task_id, task.status, error_msg=task.error_msg
            )
            return

        task.status = TaskStatus.RERUN
        task.error_msg = None
        task.sub_task_objective = f"{task.sub_task_objective}\n\nPrevious attempt failed review; feedback: {review.reasoning}"
        self._record_task_status_event(task.task_id, task.status)
        self._record_event(
            "task_patched",
            {
                "task_id": task.task_id,
                "sub_task_objective": task.sub_task_objective,
            },
        )

    async def view_qa_report(self, ctx: RunContext[RuntimeState], task_id: int) -> str:
        """Tool: View QA Report.

        Return the full serialized QA report for a specific task, if one is
        available. This is typically called by the supervisor when additional
        inspection of the critic's reasoning is required.

        Args:
            ctx: ``RunContext`` carrying the current ``RuntimeState``.
            task_id: Identifier of the task whose QA report should be viewed.

        Returns:
            A JSON-formatted string representation of the stored
            :class:`TaskQAResult`, or a message describing why no report is
            available.
        """
        async with self._plan_lock:
            task = ctx.deps.plan.get(task_id)
            logger.info(f"Checking QA Report for task: {task_id}")
            if task is None:
                return f"No task with id {task_id}."

            fb = getattr(task, "task_feedback", None)
            if fb is None:
                return f"No QA feedback found for task {task_id}."
            task.metadata.setdefault("qa", {})
            task.metadata["qa"]["report_viewed"] = True

            # Return either a summary or full JSON depending on your needs
            return fb.model_dump_json(indent=2)

__aenter__() async

Enter the async context manager and return this DeepAgent.

Allows async with DeepAgent(...) as agent: ... usage.

Source code in src/pydantask/agents/agent.py
async def __aenter__(self) -> "DeepAgent":
    """Enter the async context manager and return this ``DeepAgent``.

    Allows ``async with DeepAgent(...) as agent: ...`` usage.
    """
    return self

__aexit__(exc_type, exc, tb) async

Exit the async context manager, ensuring resources are cleaned up.

Source code in src/pydantask/agents/agent.py
async def __aexit__(self, exc_type, exc, tb) -> None:
    """Exit the async context manager, ensuring resources are cleaned up."""
    await self.aclose()

__init__(objective, model='gpt-5.2', seed_plan=None, planning_mode='llm', critic_agent=None, supervisor_agent=None, researcher_agent=None, max_steps=20, set_token_budget=None, sub_agents=None, trace=False, checkpoint=False, checkpoint_dir=None, run_from_checkpoint=False, verbose_logging=False)

Initialize a DeepAgent instance.

Parameters:

Name Type Description Default
objective str

The overall objective / task the deep agent is working on.

required
model str | Model

Model identifier or pydantic_ai.models.Model instance to use for all sub-agents. Defaults to "gpt-5.2".

'gpt-5.2'
seed_plan Plan | None

Optional pre-defined :class:~pydantask.models.Plan to seed the initial task DAG. If provided, it is loaded into :class:~pydantask.models.RuntimeState.plan at the start of :meth:run.

Notes: * Task IDs are respected and used as keys in RuntimeState.plan. * RuntimeState.next_task_id is set to max(task_id) + 1. * Dependencies are validated to ensure they reference existing tasks.

None
planning_mode Literal['llm', 'fixed', 'hybrid']

Controls whether the supervisor is allowed to modify the plan at runtime.

  • "llm": The supervisor may add/patch tasks.
  • "hybrid": Same as "llm", but typically used with seed_plan to provide an initial DAG the supervisor can extend.
  • "fixed": The supervisor is not given the plan-mutation tools (add_task/patch_task) and can only execute/transition the existing tasks.
'llm'
critic_agent Optional[Agent]

Optional pre-configured critic Agent. If omitted, a default critic agent is created.

None
supervisor_agent Optional[Agent]

Optional supervisor Agent used to manage the task DAG. If omitted, a default dynamic supervisor is created.

None
researcher_agent Optional[Agent]

Optional research Agent. If omitted, a default web/doc research agent is created.

None
producer_agent

Optional producer Agent. If omitted, a default agent is created.

required
max_steps int

Maximum number of DeepAgent control-loop iterations to run before forcing termination.

20
set_token_budget Union[int, None]

Optional global token budget for the run. Currently stored but not strictly enforced.

None
sub_agents Union[None, list[CapabilityDescription]]

Additional CapabilityDescription objects to register as callable sub-agents alongside the built-ins.

None
output_type

Pydantic model type used as the default output structure for the producer agent.

required
trace bool

If True, auto-configure tracing via the configured backend.

False
checkpoint bool

If True, enable event-sourced checkpoint logging for recovery.

False
checkpoint_dir Path | str | None

Optional directory to reuse for checkpoints when resuming a run. If omitted, a unique directory under _checkpoint/ is created.

None
verbose_logging bool

If True, log richer debugging information during execution.

False
Source code in src/pydantask/agents/agent.py
def __init__(
    self,
    objective: str,
    model: str | Model = "gpt-5.2",
    seed_plan: Plan | None = None,
    planning_mode: Literal["llm", "fixed", "hybrid"] = "llm",
    critic_agent: Optional[Agent] = None,
    supervisor_agent: Optional[Agent] = None,
    researcher_agent: Optional[Agent] = None,
    max_steps: int = 20,
    set_token_budget: Union[int, None] = None,
    sub_agents: Union[None, list[CapabilityDescription]] = None,
    # default output type for the producer agent, can be set to a default type or custom pydantic model for better structure and validation of final output
    # output_type: Type = TaskResult,
    # planning_mode: str = "dynamic",  # "static" | "dynamic"
    trace: bool = False,
    checkpoint: bool = False,
    checkpoint_dir: Path | str | None = None,
    run_from_checkpoint: bool = False,
    verbose_logging: bool = False,
):
    """Initialize a DeepAgent instance.

    Args:
        objective: The overall objective / task the deep agent is working on.
        model: Model identifier or ``pydantic_ai.models.Model`` instance to use
            for all sub-agents. Defaults to ``"gpt-5.2"``.
        seed_plan: Optional pre-defined :class:`~pydantask.models.Plan` to seed
            the initial task DAG. If provided, it is loaded into
            :class:`~pydantask.models.RuntimeState.plan` at the start of
            :meth:`run`.

            Notes:
            * Task IDs are respected and used as keys in ``RuntimeState.plan``.
            * ``RuntimeState.next_task_id`` is set to ``max(task_id) + 1``.
            * Dependencies are validated to ensure they reference existing tasks.
        planning_mode: Controls whether the supervisor is allowed to modify the
            plan at runtime.

            * ``"llm"``: The supervisor may add/patch tasks.
            * ``"hybrid"``: Same as ``"llm"``, but typically used with
              ``seed_plan`` to provide an initial DAG the supervisor can extend.
            * ``"fixed"``: The supervisor is not given the plan-mutation tools
              (``add_task``/``patch_task``) and can only execute/transition the
              existing tasks.
        critic_agent: Optional pre-configured critic ``Agent``. If omitted, a
            default critic agent is created.
        supervisor_agent: Optional supervisor ``Agent`` used to manage the task
            DAG. If omitted, a default dynamic supervisor is created.
        researcher_agent: Optional research ``Agent``. If omitted, a default
            web/doc research agent is created.
        producer_agent: Optional producer ``Agent``. If omitted, a default
            agent is created.
        max_steps: Maximum number of DeepAgent control-loop iterations to run
            before forcing termination.
        set_token_budget: Optional global token budget for the run. Currently
            stored but not strictly enforced.
        sub_agents: Additional ``CapabilityDescription`` objects to register as
            callable sub-agents alongside the built-ins.
        output_type: Pydantic model type used as the default output structure
            for the producer agent.
        trace: If ``True``, auto-configure tracing via the configured backend.
        checkpoint: If ``True``, enable event-sourced checkpoint logging for recovery.
        checkpoint_dir: Optional directory to reuse for checkpoints when resuming a run.
            If omitted, a unique directory under ``_checkpoint/`` is created.
        verbose_logging: If ``True``, log richer debugging information during
            execution.
    """

    if trace:
        init_tracing_backend(autodetect_tracing_backend())

    # `model` can be either:
    #   - a pydantic_ai Model instance (fully custom)
    #   - a bare model name (defaults to OpenAI), e.g. "gpt-4.1-mini"
    #   - a provider-prefixed string, e.g. "openai:gpt-4.1-mini" or "anthropic:claude-sonnet-4-5"
    self.model_name: str = (
        model if isinstance(model, str) else model.__class__.__name__
    )

    if objective is None:
        raise TypeError("DeepAgent requires 'objective' to be provided")

    if planning_mode in {"fixed", "hybrid"} and seed_plan is None:
        raise ValueError(
            "seed_plan must be provided when planning_mode is 'fixed' or 'hybrid'"
        )

    self.objective: str = objective
    self._max_steps: int = max_steps  # Max steps to prevent infinite loops
    self.token_budget: Union[int, None] = set_token_budget
    self.verbose = verbose_logging
    # self.output_type = output_type
    self.planning_mode = planning_mode
    self.seed_plan: Union[Plan, None] = seed_plan
    self._retry_client = self._create_retrying_client()

    # Checkpointing / resume semantics:
    # - `checkpoint=True` enables writing events.
    # - `checkpoint_dir=...` forces checkpointing on and chooses the directory.
    # - `run_from_checkpoint=True` requires `checkpoint_dir` and will replay
    #   events from that directory on `run()`.
    if run_from_checkpoint and checkpoint_dir is None:
        raise ValueError(
            "checkpoint_dir must be provided when run_from_checkpoint=True"
        )

    if checkpoint_dir is not None or run_from_checkpoint:
        checkpoint = True

    self.checkpoint = checkpoint
    self.run_from_checkpoint = run_from_checkpoint

    # Concurrency guardrails:
    # - `_plan_lock` protects plan-level mutations and task claiming (READY->RUNNING).
    self._plan_lock = asyncio.Lock()

    self.checkpoint_path: Path | None = None
    self._checkpoint_recorder: CheckpointRecorder | None = None
    if self.checkpoint:
        self.checkpoint_path = (
            Path(checkpoint_dir)
            if checkpoint_dir is not None
            else Path(f"_checkpoint/{uuid.uuid4()}/")
        )
        self.checkpoint_path.mkdir(parents=True, exist_ok=True)
        self._checkpoint_recorder = CheckpointRecorder(self.checkpoint_path)

    # Build the shared model used by all sub-agents.
    # TODO: Future state allow for configuration of what models to use per capability
    # We inject the retrying httpx client into the provider for durability.
    self._retry_model = self._build_model(model)

    # NOTE: Filesystem tools exist in `pydantask.tools.default_tools`, but are not
    # enabled by default. The harness is currently in-memory focused.
    self._critic_agent = critic_agent or Agent(
        model=self._retry_model,
        name="_default_Critic_Agent",
        system_prompt=CRITIC_SYS_PROMPT,
        output_type=TaskQAResult,
        deps_type=RuntimeState,
        tools=[get_current_datetime, think_tool],
        # end_strategy="exhaustive",
    )

    self._supervisor_agent = supervisor_agent or Agent(
        model=self._retry_model,
        name="_dynamic_Supervisor_Agent",
        system_prompt=DYNAMIC_SUPERVISOR_SYS_PROMPT,
        output_type=SupervisorDecision,
        deps_type=RuntimeState,
        tools=self._default_supervisor_tools(),
        end_strategy="exhaustive",
    )

    # TODO: rework some of these tools
    tavily_api_key = os.getenv("TAVILY_API_KEY", None)

    _defautl_research_tool_set = [
        think_tool,
        append_scratch_note,
        read_scratch_notes,
        get_current_datetime,
    ]

    if not tavily_api_key:
        logger.info(
            "Tavily api key not found. Defaulting to built in Duck Duck Go search tool."
        )
        _defautl_research_tool_set.append(duckduckgo_search_tool())
    else:
        _defautl_research_tool_set.append(tavily_search_tool(tavily_api_key))

    self._researcher_agent = researcher_agent or Agent(
        model=self._retry_model,
        name="_default_Research_Agent", 
        system_prompt=RESEARCH_AGENT_SYS_PROMPT,
        tools=_defautl_research_tool_set,
        deps_type=TaskRunDeps,
        output_type=TaskResult,
    )

    self._capability_registry = self._setup_capabilities(
        additonal_capabilities=sub_agents
    )

    # Scheduler/system notes injected into the next supervisor prompt.
    self._last_scheduler_report: str = ""

aclose() async

Close underlying resources used by this DeepAgent instance.

This is primarily responsible for flushing any tracing backends and closing the shared async HTTP client used by the model providers. Safe to call multiple times.

Source code in src/pydantask/agents/agent.py
async def aclose(self) -> None:
    """Close underlying resources used by this ``DeepAgent`` instance.

    This is primarily responsible for flushing any tracing backends and
    closing the shared async HTTP client used by the model providers.
    Safe to call multiple times.
    """
    try:
        # best-effort; safe to call even if tracing is disabled
        flush_tracing()
    finally:
        if getattr(self, "_retry_client", None) is not None:
            await self._retry_client.aclose()

add_task(ctx, sub_task_objective, capability, dependencies=None, metadata=None) async

Tool: Add Task.

Note: In planning_mode="fixed" this tool is not registered on the supervisor agent, but it may still be called directly in Python.

Create and register a new TaskItem in the current plan/DAG when more work is required to achieve the overall objective.

The supervisor should specify any upstream dependencies so that execution order can be enforced.

Parameters:

Name Type Description Default
ctx RunContext[RuntimeState]

RunContext carrying the current RuntimeState.

required
sub_task_objective str

Natural-language objective for the new task.

required
capability str

Name of the capability / sub-agent that should execute this task.

required
dependencies list[int] | None

Optional list of task IDs that must complete successfully before this task can run.

None
metadata dict | None

Optional free-form metadata dictionary attached to the task.

None

Returns:

Type Description
int

The integer task_id assigned to the newly created task.

Source code in src/pydantask/agents/agent.py
async def add_task(
    self,
    ctx: RunContext[RuntimeState],
    sub_task_objective: str,
    capability: str,
    dependencies: list[int] | None = None,
    metadata: dict | None = None,
) -> int:
    """Tool: Add Task.

    Note: In ``planning_mode="fixed"`` this tool is not registered on the
    supervisor agent, but it may still be called directly in Python.

    Create and register a new ``TaskItem`` in the current plan/DAG when
    more work is required to achieve the overall objective.

    The supervisor should specify any upstream dependencies so that
    execution order can be enforced.

    Args:
        ctx: ``RunContext`` carrying the current ``RuntimeState``.
        sub_task_objective: Natural-language objective for the new task.
        capability: Name of the capability / sub-agent that should execute
            this task.
        dependencies: Optional list of task IDs that must complete
            successfully before this task can run.
        metadata: Optional free-form metadata dictionary attached to the task.

    Returns:
        The integer ``task_id`` assigned to the newly created task.
    """
    async with self._plan_lock:
        plan = ctx.deps.plan
        new_id = ctx.deps.next_task_id
        ctx.deps.next_task_id += 1

        task = TaskItem(
            task_id=new_id,
            overall_objective=ctx.deps.objective,
            sub_task_objective=sub_task_objective,
            capability=capability,
            sub_task_dependencies=dependencies or [],
            metadata=metadata or {},
            status=TaskStatus.READY,
        )
        plan[new_id] = task
        self._record_event(
            "task_added",
            {
                "task": task.model_dump(),
                "next_task_id": ctx.deps.next_task_id,
            },
        )
        return new_id

cancel_task(ctx, task_id, reason) async

Tool: Cancel Task.

Mark a task as CANCELLED when it is no longer relevant or when a failure in an upstream dependency makes it impossible to complete.

Parameters:

Name Type Description Default
ctx RunContext[RuntimeState]

RunContext carrying the current RuntimeState.

required
task_id int

Identifier of the task to cancel.

required
reason str

Human-readable explanation for the cancellation.

required
Source code in src/pydantask/agents/agent.py
async def cancel_task(
    self, ctx: RunContext[RuntimeState], task_id: int, reason: str
):
    """Tool: Cancel Task.

    Mark a task as ``CANCELLED`` when it is no longer relevant or when
    a failure in an upstream dependency makes it impossible to complete.

    Args:
        ctx: ``RunContext`` carrying the current ``RuntimeState``.
        task_id: Identifier of the task to cancel.
        reason: Human-readable explanation for the cancellation.
    """
    async with self._plan_lock:
        if task_id in ctx.deps.plan:
            task = ctx.deps.plan[task_id]
            # Instead of deleting, mark as CANCELLED to keep history
            task.status = TaskStatus.CANCELLED
            task.error_msg = reason
            self._record_task_status_event(
                task_id,
                TaskStatus.CANCELLED,
                reason=reason,
                error_msg=reason,
            )
            return f"Task {task_id} cancelled. Reason: {reason}"
        return f"Error: Task {task_id} not found."

execute(sub_agent, step, runtime_state) async

Execute a sub-agent for a single task and record the result.

Builds a task-specific prompt (with optional supervisor feedback), runs the provided sub_agent, and updates the TaskItem status and result based on success or failure.

Source code in src/pydantask/agents/agent.py
    @traced(run_type="task", capture_input=False)
    @retry(wait=wait_exponential_jitter(), reraise=True, stop=stop_after_attempt(3))
    async def execute(
        self, sub_agent: Agent, step: TaskItem, runtime_state: RuntimeState
    ) -> TaskItem:
        """Execute a sub-agent for a single task and record the result.

        Builds a task-specific prompt (with optional supervisor feedback),
        runs the provided ``sub_agent``, and updates the ``TaskItem`` status
        and result based on success or failure.
        """

        # check to see if there was feedback or additional instructions for the task from the supervisor
        _feedback_for_agent = None
        if isinstance(step.parameters, dict):
            _feedback_for_agent = step.parameters.get("supervisor_feedback")

        if step.capability == "producer_agent":

            user_prompt = f"""
            Overall objective:
            {self.objective}

            You are the final synthesis agent.
            - First, call `list_completed_tasks` to see all completed upstream tasks.
            - For each task that is relevant to the objective (especially research tasks), call `get_task_result(task_id=...)`.
            - THEN, write a single, coherent comparative analysis answering the objective.
            - You MUST explicitly integrate evidence from ALL relevant completed tasks (e.g. Task 1 and Task 2 in this run).
            """

            if _feedback_for_agent:

                user_prompt += f"""

                    Supervisor feedback / additional instructions for this execution:

                    {_feedback_for_agent}
                    """

            user_prompt += """
                    Your job:
                    - Use ONLY the completed sub-task results from this run.
                    - Combine their findings into a single, coherent final answer.
                    - Follow your system prompt instructions for citations and final TaskResult structure.
                    - Do NOT request new research or create new sub-tasks.
                    """
        else:
            user_prompt = f"""
                You are executing TaskItem:

            {step.model_dump_json(indent=2)}

                Overall objective:
                {self.objective}

                """
            if _feedback_for_agent:
                user_prompt += f"""

                Supervisor feedback / additional instructions for this execution:
                {_feedback_for_agent}
                """

            user_prompt += """

            ONLY act on this sub-task and any feedback. Do not re-plan or change the task.
            """
        task_deps = TaskRunDeps(runtime_state=runtime_state, task=step)

        # Help smaller-context models avoid blowing up in a single long tool-run.
        # This doesn't guarantee safety (tool output can still be large), but combined
        # with truncated tool outputs and scratch checkpoints it greatly improves durability.
        user_prompt += f"""

Context-budget note:
- You may be running on a smaller-context model.
- Prefer small tool outputs. When calling tools that can return large text, request truncation.
- Checkpoint progress frequently via `append_scratch_note(task_id={step.task_id}, note=...)`.
"""

        max_resume_attempts = 2
        last_error: Exception | None = None

        for resume_attempt in range(max_resume_attempts + 1):
            tool_call_limit = 20 if resume_attempt == 0 else 10

            try:
                result = await sub_agent.run(
                    user_prompt,
                    deps=task_deps,
                    usage_limits=UsageLimits(tool_calls_limit=tool_call_limit),
                )
                step.result = result.output
                step.status = TaskStatus.NEEDS_REVIEW
                step.error_msg = None
                self._record_task_result(step)
                self._record_task_status_event(step.task_id, TaskStatus.NEEDS_REVIEW)
                return step
            except Exception as e:
                last_error = e
                if (
                    self._is_context_limit_error(e)
                    and resume_attempt < max_resume_attempts
                ):
                    # Record the incident and attempt a "fresh run" using scratch checkpoints.
                    overflow_entry = {
                        "at": datetime.now().isoformat(),
                        "attempt": resume_attempt,
                        "error": str(e),
                    }
                    step.metadata.setdefault("context_overflow", [])
                    step.metadata["context_overflow"].append(overflow_entry)
                    self._record_metadata_append(
                        step.task_id, "context_overflow", overflow_entry
                    )

                    # Build a minimal prompt to continue from checkpoint notes.
                    user_prompt = self._build_resume_prompt(step, e)
                    continue

                step.status = TaskStatus.ERRORED
                step.error_msg = str(e)
                self._record_task_status_event(
                    step.task_id,
                    TaskStatus.ERRORED,
                    error_msg=step.error_msg,
                )
                return step

        # Should be unreachable, but keep a safe fallback.
        step.status = TaskStatus.ERRORED
        step.error_msg = str(last_error) if last_error else "Unknown error"
        self._record_task_status_event(
            step.task_id, TaskStatus.ERRORED, error_msg=step.error_msg
        )
        return step

handle_critic_result(task, review)

Apply the critic's QA result to a task and emit checkpoint events.

Source code in src/pydantask/agents/agent.py
def handle_critic_result(self, task: TaskItem, review: TaskQAResult):
    """Apply the critic's QA result to a task and emit checkpoint events."""
    task.attempt_count += 1
    task.task_feedback = review

    self._record_event(
        "critic_feedback",
        {
            "task_id": task.task_id,
            "feedback": review.model_dump(),
            "attempt_count": task.attempt_count,
        },
    )

    if review.passed:
        task.status = TaskStatus.COMPLETED
        task.error_msg = None
        self._record_task_status_event(task.task_id, task.status)
        return

    if task.attempt_count >= task.max_attempts:
        task.status = TaskStatus.FAILED
        task.error_msg = (
            f"Max retries reached ({task.attempt_count}/{task.max_attempts})."
        )
        self._record_task_status_event(
            task.task_id, task.status, error_msg=task.error_msg
        )
        return

    task.status = TaskStatus.RERUN
    task.error_msg = None
    task.sub_task_objective = f"{task.sub_task_objective}\n\nPrevious attempt failed review; feedback: {review.reasoning}"
    self._record_task_status_event(task.task_id, task.status)
    self._record_event(
        "task_patched",
        {
            "task_id": task.task_id,
            "sub_task_objective": task.sub_task_objective,
        },
    )

mark_final_task(ctx, task_id, reason=None) async

Tool: Mark Final Task.

Mark exactly one task as the final deliverable for the run.

This tool should be called by the supervisor (planner/orchestrator), not by workers. It enables deterministic "final_result" selection on resume.

The invariant enforced is: at most one task has is_final=True.

Source code in src/pydantask/agents/agent.py
async def mark_final_task(
    self,
    ctx: RunContext[RuntimeState],
    task_id: int,
    reason: str | None = None,
) -> str:
    """Tool: Mark Final Task.

    Mark exactly one task as the final deliverable for the run.

    This tool should be called by the supervisor (planner/orchestrator), not
    by workers. It enables deterministic "final_result" selection on resume.

    The invariant enforced is: at most one task has `is_final=True`.
    """
    async with self._plan_lock:
        if task_id not in ctx.deps.plan:
            return f"Error: No task with id {task_id} found in plan."

        for t in ctx.deps.plan.values():
            t.is_final = False

        ctx.deps.plan[task_id].is_final = True

        payload: Dict[str, Any] = {"task_id": task_id}
        if reason:
            payload["reason"] = reason
        self._record_event("final_task_set", payload)

        return f"Task {task_id} marked as final."

patch_task(ctx, task_id, sub_task_objective=None, dependencies=None) async

Tool: Patch Task.

Note: In planning_mode="fixed" this tool is not registered on the supervisor agent, but it may still be called directly in Python.

Update an existing task's objective and/or dependency list in-place.

Parameters:

Name Type Description Default
ctx RunContext[RuntimeState]

RunContext carrying the current RuntimeState.

required
task_id int

Identifier of the task to modify.

required
sub_task_objective Optional[str]

New sub-task objective, if changing.

None
dependencies Optional[List[int]]

Updated list of dependency IDs, if changing.

None
Source code in src/pydantask/agents/agent.py
async def patch_task(
    self,
    ctx: RunContext[RuntimeState],
    task_id: int,
    sub_task_objective: Optional[str] = None,
    dependencies: Optional[List[int]] = None,
):
    """Tool: Patch Task.

    Note: In ``planning_mode="fixed"`` this tool is not registered on the
    supervisor agent, but it may still be called directly in Python.

    Update an existing task's objective and/or dependency list in-place.

    Args:
        ctx: ``RunContext`` carrying the current ``RuntimeState``.
        task_id: Identifier of the task to modify.
        sub_task_objective: New sub-task objective, if changing.
        dependencies: Updated list of dependency IDs, if changing.
    """
    async with self._plan_lock:
        task = ctx.deps.plan.get(task_id)
        if not task:
            return "Task not found."

        payload: Dict[str, Any] = {"task_id": task_id}

        if sub_task_objective:
            task.sub_task_objective = sub_task_objective
            payload["sub_task_objective"] = task.sub_task_objective
        if dependencies is not None:
            task.sub_task_dependencies = dependencies
            payload["dependencies"] = task.sub_task_dependencies

        if len(payload) > 1:
            self._record_event("task_patched", payload)

        return f"Task {task_id} updated successfully."

run() async

Run the full DeepAgent control loop until completion or max steps.

If a seed_plan was supplied at construction time, it is loaded into the runtime state before the supervisor loop begins.

This method repeatedly:

  • Invokes the supervisor to decide which tasks to execute next.
  • Executes ready tasks in parallel via their associated sub-agents.
  • Sends results to the critic for QA and status updates.
  • Optionally checkpoints state between iterations.

Returns:

Type Description
DeepAgentRunResult

A DeepAgentRunResult containing the final output, the full plan,

DeepAgentRunResult

and the final RuntimeState.

Source code in src/pydantask/agents/agent.py
@traced()
async def run(self) -> DeepAgentRunResult:
    """Run the full DeepAgent control loop until completion or max steps.

    If a ``seed_plan`` was supplied at construction time, it is loaded into the
    runtime state before the supervisor loop begins.

    This method repeatedly:

    * Invokes the supervisor to decide which tasks to execute next.
    * Executes ready tasks in parallel via their associated sub-agents.
    * Sends results to the critic for QA and status updates.
    * Optionally checkpoints state between iterations.

    Returns:
        A ``DeepAgentRunResult`` containing the final output, the full plan,
        and the final ``RuntimeState``.
    """
    runtime_state = self._initialize_runtime_state(
        objective=self.objective, registry=self._capability_registry
    )
    self._apply_seed_plan(runtime_state)
    self._replay_checkpoint(runtime_state)

    errors: list[str] = []
    no_progress_cycles = 0

    step_count = 0
    stop_execution = False
    while step_count < self._max_steps and not stop_execution:

        logger.info(f"\n--- DeepAgent Cycle {step_count} ---")

        # Deterministic scheduler pass to normalize readiness and surface issues.
        self._last_scheduler_report = await self._scheduler_pass(runtime_state)

        current_instruction = BOOTSTRAP_INSTURCT if len(runtime_state.plan) == 0 else ORCHESTRATION_INSTRUCT
        supervisor_response = await self._supervisor_agent.run(
            self._format_supervisor_input_prompt(runtime_state),
            deps=runtime_state,
            instructions=current_instruction
        )
        supervisor_response = supervisor_response.output

        self._record_event(
            "supervisor_decision",
            supervisor_response.model_dump(),
        )

        if supervisor_response.all_tasks_completed:
            # Deterministic guardrail: do not allow "completion" unless the
            # task marked as final is actually COMPLETED.
            final_tasks = [
                t for t in runtime_state.plan.values() if getattr(t, "is_final", False)
            ]

            completion_ok = True
            reasons: list[str] = []

            if not final_tasks:
                completion_ok = False
                reasons.append("no task is marked Final: True")
            elif len(final_tasks) > 1:
                completion_ok = False
                reasons.append(
                    f"multiple tasks are marked Final: True ({[t.task_id for t in final_tasks]})"
                )
            else:
                ft = final_tasks[0]
                if ft.status != TaskStatus.COMPLETED:
                    completion_ok = False
                    reasons.append(
                        f"final task {ft.task_id} status is {ft.status.value!r} (expected 'completed')"
                    )
                if ft.result is None:
                    completion_ok = False
                    reasons.append(f"final task {ft.task_id} has no TaskResult")

            if not completion_ok:
                msg = (
                    "Supervisor returned all_tasks_completed=True, but completion invariants "
                    f"are not met: {', '.join(reasons)}. Overriding to continue."
                )
                logger.warning(msg)
                self._last_scheduler_report = (
                    self._last_scheduler_report
                    + "\n\nCOMPLETION OVERRIDE (deterministic):\n- "
                    + msg
                ).strip()

                # Treat this as a no-progress cycle so we eventually fail-safe.
                no_progress_cycles += 1
                if self.checkpoint:
                    self._checkpoint_state(runtime_state)
                runtime_state.runtime_steps += 1
                step_count += 1
                continue

            logger.info(
                "--- Supervisor declared completion and final task is completed. Ending execution loop. ---"
            )
            stop_execution = True
            break

        logger.info("--- Executing Tasks ---")
        # execute tasks that are ready to run and await responses
        task_results = await self._execute_ready_tasks(
            supervisor_response, runtime_state
        )

        # NOTE: `execute(...)` mutates the canonical TaskItem stored in `runtime_state.plan`
        # in-place (it receives the same object reference). Do NOT overwrite
        # `runtime_state.plan[task_id]` with returned TaskItems here; that can clobber
        # concurrent metadata updates (e.g. scratch notes/checkpoints).

        if len(task_results) == 0:
            # No tasks ran this cycle. This is not necessarily terminal in a
            # dynamic planner: we may be blocked on deps, have errored tasks
            # that need patching, or need the supervisor to add new nodes.
            no_progress_cycles += 1
            deadlock = self._build_deadlock_report(
                runtime_state, supervisor_response
            )
            self._last_scheduler_report = (
                self._last_scheduler_report + "\n\n" + deadlock
            ).strip()

            logger.info(
                f"No executable tasks this cycle (no_progress_cycles={no_progress_cycles}). Continuing."
            )

            # Prevent infinite loops if the supervisor cannot make progress.
            if no_progress_cycles >= 3:
                msg = (
                    "No progress after 3 consecutive cycles (no tasks executed). "
                    "Stopping to avoid infinite loop. "
                    "See SYSTEM SCHEDULER NOTES in the final cycle for details."
                )
                logger.warning(msg)
                errors.append(msg)
                stop_execution = True

            if self.checkpoint:
                self._checkpoint_state(runtime_state)

            runtime_state.runtime_steps += 1
            step_count += 1
            continue

        no_progress_cycles = 0

        logger.info(f"Number of tasks executed: {len(task_results)}")
        # go through responses and evaluate if they have completed the task
        for task_result in task_results or []:
            logger.info(f"--- Evaluating Task Result for {task_result.task_id} ---")

            qa_response = await self._critic_agent.run(
                self._format_critic_input_prompt(task_result, runtime_state),
                deps=runtime_state,
            )
            qa_response = qa_response.output
            if self.verbose:
                logger.info("--- QA Response ---")
                logger.info(qa_response.model_dump_json(indent=2))

            task = runtime_state.plan[task_result.task_id]

            # deterministic transition based on critic
            self.handle_critic_result(task, qa_response)

        if self.checkpoint:
            self._checkpoint_state(runtime_state)

        runtime_state.runtime_steps += 1
        step_count += 1

    return_result = DeepAgentRunResult(
        objective=self.objective,
        final_result=self._select_final_result(runtime_state),
        plan=runtime_state.plan,
        runtime_state=runtime_state,
        errors=errors,
    )

    return return_result

update_task_status(ctx, task_id, status) async

Tool: Update Task Status.

Primarily used by the supervisor to transition a task between states (e.g. to READY or COMPLETED) once dependencies are met or QA has passed.

Parameters:

Name Type Description Default
ctx RunContext[RuntimeState]

RunContext carrying the current RuntimeState.

required
task_id int

Identifier of the task to update.

required
status TaskStatus

New :class:TaskStatus value for the task.

required
Source code in src/pydantask/agents/agent.py
async def update_task_status(
    self, ctx: RunContext[RuntimeState], task_id: int, status: TaskStatus
):
    """Tool: Update Task Status.

    Primarily used by the supervisor to transition a task between states
    (e.g. to ``READY`` or ``COMPLETED``) once dependencies are met or QA
    has passed.

    Args:
        ctx: ``RunContext`` carrying the current ``RuntimeState``.
        task_id: Identifier of the task to update.
        status: New :class:`TaskStatus` value for the task.
    """
    async with self._plan_lock:
        if task_id in ctx.deps.plan:
            task = ctx.deps.plan.get(task_id)
            if task is not None:
                task.status = status
                self._record_task_status_event(task_id, status)
            return f"Status for {task_id} is now {status}."
        return f"Error: No task with {task_id} found in plan. Be sure task_id actually exists."

view_qa_report(ctx, task_id) async

Tool: View QA Report.

Return the full serialized QA report for a specific task, if one is available. This is typically called by the supervisor when additional inspection of the critic's reasoning is required.

Parameters:

Name Type Description Default
ctx RunContext[RuntimeState]

RunContext carrying the current RuntimeState.

required
task_id int

Identifier of the task whose QA report should be viewed.

required

Returns:

Type Description
str

A JSON-formatted string representation of the stored

str

class:TaskQAResult, or a message describing why no report is

str

available.

Source code in src/pydantask/agents/agent.py
async def view_qa_report(self, ctx: RunContext[RuntimeState], task_id: int) -> str:
    """Tool: View QA Report.

    Return the full serialized QA report for a specific task, if one is
    available. This is typically called by the supervisor when additional
    inspection of the critic's reasoning is required.

    Args:
        ctx: ``RunContext`` carrying the current ``RuntimeState``.
        task_id: Identifier of the task whose QA report should be viewed.

    Returns:
        A JSON-formatted string representation of the stored
        :class:`TaskQAResult`, or a message describing why no report is
        available.
    """
    async with self._plan_lock:
        task = ctx.deps.plan.get(task_id)
        logger.info(f"Checking QA Report for task: {task_id}")
        if task is None:
            return f"No task with id {task_id}."

        fb = getattr(task, "task_feedback", None)
        if fb is None:
            return f"No QA feedback found for task {task_id}."
        task.metadata.setdefault("qa", {})
        task.metadata["qa"]["report_viewed"] = True

        # Return either a summary or full JSON depending on your needs
        return fb.model_dump_json(indent=2)

Core Models

These models define the task/plan structure, runtime state, and capability descriptions used by DeepAgent and sub‑agents.

Task and Plan Models

TaskStatus

Bases: Enum

Lifecycle state for a :class:TaskItem within a DeepAgent plan.

Values

PENDING: Waiting for dependencies to complete.

READY: All dependencies met; eligible to run.

RUNNING: Currently being executed by a sub-agent.

COMPLETED: Successfully finished and accepted.

ERRORED: Execution error occurred (tools, runtime, etc.).

FAILED: Evaluator/critic rejected the result.

NEEDS_REVIEW: Needs evaluator/supervisor review.

RERUN: Marked to be re-executed with revised instructions.

CANCELLED: Task that is no longer needed or relevant to complete the objective.

Source code in src/pydantask/models/models.py
class TaskStatus(Enum):
    """Lifecycle state for a :class:`TaskItem` within a DeepAgent plan.

    Values:
        PENDING: Waiting for dependencies to complete. \n
        READY: All dependencies met; eligible to run. \n
        RUNNING: Currently being executed by a sub-agent. \n
        COMPLETED: Successfully finished and accepted. \n
        ERRORED: Execution error occurred (tools, runtime, etc.). \n
        FAILED: Evaluator/critic rejected the result. \n
        NEEDS_REVIEW: Needs evaluator/supervisor review. \n
        RERUN: Marked to be re-executed with revised instructions. \n
        CANCELLED: Task that is no longer needed or relevant to complete the objective. \n
    """

    PENDING = "pending"  # Waiting for dependencies
    READY = "ready"  # Dependencies met, can run now
    RUNNING = "running"  # Currently being executed
    COMPLETED = "completed"
    ERRORED = "errored"  # Execution error occurred
    FAILED = "failed"  # Evaluator rejected it
    NEEDS_REVIEW = "needs_review"  # Needs Evaluator review
    RERUN = "rerun"
    CANCELLED = "cancelled"

TaskItem

Bases: BaseModel

One sub-task in a DeepAgent plan.

Created by the planner and then updated over the life of the run as it moves through :class:TaskStatus states and accumulates results and feedback.

Attributes:

Name Type Description
task_id int

Unique integer task identifier. Integer value.

overall_objective str

The overall objective this task contributes to.

sub_task_objective str

The specific objective for this sub-task.

status TaskStatus

Current lifecycle state of the task.

result Optional[TaskResult]

The :class:TaskResult produced by the assigned capability, if the task has been executed.

capability str

Name of the capability (sub-agent) that should execute this task (e.g. "research_agent", "producer_agent").

sub_task_dependencies Optional[List[int]]

IDs of tasks that must be COMPLETED before this task is eligible to run.

task_feedback Optional[TaskQAResult]

Latest :class:TaskQAResult from the critic/QA agent.

error_msg Optional[str]

Any error message associated with this task.

iteration_history List

Optional history of prior attempts/outputs.

time_scope Optional[str]

Optional temporal scope string ("2026", "last 7 days", etc.).

parameters dict

Arbitrary structured parameters (e.g. supervisor feedback).

attempt_count int

How many times this task has been attempted.

max_attempts int

Maximum times this task is allowed to be attempted.

metadata dict

Optional free-form metadata for this task.

Source code in src/pydantask/models/models.py
class TaskItem(BaseModel):
    """One sub-task in a DeepAgent plan.

    Created by the planner and then updated over the life of the run as it moves
    through :class:`TaskStatus` states and accumulates results and feedback.

    Attributes:
        task_id: Unique integer task identifier. Integer value.
        overall_objective: The overall objective this task contributes to.
        sub_task_objective: The specific objective for this sub-task.
        status: Current lifecycle state of the task.
        result: The :class:`TaskResult` produced by the assigned capability, if
            the task has been executed.
        capability: Name of the capability (sub-agent) that should execute this
            task (e.g. "research_agent", "producer_agent").
        sub_task_dependencies: IDs of tasks that must be COMPLETED before this
            task is eligible to run.
        task_feedback: Latest :class:`TaskQAResult` from the critic/QA agent.
        error_msg: Any error message associated with this task.
        iteration_history: Optional history of prior attempts/outputs.
        time_scope: Optional temporal scope string ("2026", "last 7 days", etc.).
        parameters: Arbitrary structured parameters (e.g. supervisor feedback).
        attempt_count: How many times this task has been attempted.
        max_attempts: Maximum times this task is allowed to be attempted.
        metadata: Optional free-form metadata for this task.
    """

    task_id: int = Field(description="Unique task id. Should be an integer")
    overall_objective: str = Field(
        description="The overall objective this task is contributing to solving."
    )
    sub_task_objective: str = Field(
        description="The sub task objective that must be solved for."
    )
    status: TaskStatus
    result: Optional[TaskResult] = Field(
        description="Where to put the task result if completed.", default=None
    )
    capability: str = Field(
        description="Which sub agent capability should attempt this task."
    )
    sub_task_dependencies: Optional[List[int]] = Field(
        description="Put task_id dependency IDs here", default_factory=list
    )
    task_feedback: Optional[TaskQAResult] = None  # Store the Eval "critique" here
    error_msg: Optional[str] = Field(
        default=None,
        description="Any errors that happened during the running of this task. Only store the more recent error message.",
    )  # Store any error messages here
    iteration_history: List = Field(
        default_factory=list,
        description="Store any answer history if multiple attempts are made.",
    )  # Store any answer history if multiple attempts are made
    time_scope: Optional[str] = Field(
        default=None, description="2026, 2021-2025, two days ago"
    )  # "2026", "2025-2026", "last 7 days", etc.
    parameters: dict = Field(
        default_factory=dict
    )  # you can stash structured temporal params here
    attempt_count: int = 0
    max_attempts: int = 3
    metadata: dict = Field(
        default_factory=dict, description="Optional metadata for this task."
    )

    is_final: bool = Field(
        default=False,
        description=(
            "Marks this task as a final deliverable for the overall run. "
            "This should be set deterministically by the supervisor (via a tool), "
            "not guessed by workers."
        ),
    )

    @property
    def latest_output(self):
        return self.iteration_history[-1].output if self.iteration_history else None

Bases: BaseModel

Canonical result type for any sub-task executed by DeepAgent.

TaskResult stores the output from any given task.

Attributes:

Name Type Description
task_id int

ID of the :class:TaskItem this result belongs to.

status TaskStatus

Outcome of this task execution (COMPLETED/ERRORED/FAILED/etc.).

summary str

Human-readable summary of what this task produced or concluded.

notes List[str]

Any notes or scratch references used while completing this task.

sources List[SourceRef]

Structured list of :class:SourceRef citations used in this result. Inline citations should reference SourceRef.id values.

error_msg Optional[str]

Explanation of what went wrong if the task errored or failed.

metadata dict

Optional free-form metadata specific to this task execution.

Source code in src/pydantask/models/models.py
class TaskResult(BaseModel):
    """Canonical result type for any sub-task executed by DeepAgent.

    TaskResult stores the output from any given task.

    Attributes:
        task_id: ID of the :class:`TaskItem` this result belongs to.
        status: Outcome of this task execution (COMPLETED/ERRORED/FAILED/etc.).
        summary: Human-readable summary of what this task produced or concluded.
        notes: Any notes or scratch references used while completing this task.
        sources: Structured list of :class:`SourceRef` citations used in this
            result. Inline citations should reference ``SourceRef.id`` values.
        error_msg: Explanation of what went wrong if the task errored or failed.
        metadata: Optional free-form metadata specific to this task execution.
    """

    task_id: int = Field(description="ID of the TaskItem this result belongs to.")

    status: TaskStatus = Field(
        default=TaskStatus.COMPLETED,
        description="Outcome of this specific task execution.",
    )

    summary: str = Field(
        default="",
        description=(
            "Concise, human-readable summary of what this task produced or concluded. "
        ),
    )

    detailed_output: str = Field(
        default="", description="Detailed output for the task if required."
    )

    notes: List[str] = Field(
        default_factory=list,
        description="All notes or scratch file paths that were used to help complete this task.",
    )

    sources: List[SourceRef] = Field(
        default_factory=list,
        description=(
            "Structured list of sources used to produce this result. "
            "Inline citations in summaries should reference SourceRef.id values, "
            "e.g. [1], [2]."
        ),
    )

    error_msg: Optional[str] = Field(
        default=None,
        description=(
            "If status is ERRORED or FAILED, a clear explanation of what went "
            "wrong or what information/tools were missing."
        ),
    )

    metadata: dict = Field(
        default_factory=dict,
        description=(
            "Optional free-form metadata (e.g. timestamps, scoring, extra flags) "
            "specific to this task execution."
        ),
    )

Bases: BaseModel

Evaluation result produced by the critic/QA agent for a single task.

Attributes:

Name Type Description
task_id int

ID of the :class:TaskItem being evaluated. It MUST match the task that is being evaluated.

reasoning str

Detailed explanation of how the result was judged and why you scored the way you did.

passed bool

True if the worker output sufficiently meets the task objective.

Source code in src/pydantask/models/models.py
class TaskQAResult(BaseModel):
    """Evaluation result produced by the critic/QA agent for a single task.

    Attributes:
        task_id: ID of the :class:`TaskItem` being evaluated. It MUST match the task that is being evaluated.
        reasoning: Detailed explanation of how the result was judged and why you scored the way you did.
        passed: True if the worker output sufficiently meets the task objective.
    """

    task_id: int = Field(
        default=-1,
        description="The task_id for the task being evaluated. Ex. Criticing task has task_id of 1, thus the task_id of this field will also be 1.",
    )
    reasoning: str = Field(
        default="",
        description="Detailed explanation of how the result was judged, why it god the score that it did and feedback for supervisor to attempt a retry.",
    )

    passed: bool = Field(
        default=False,
        description="Whether the task passed qa/critic. True if you found that the worker output sufficiently meets the task objective.",
    )

Bases: BaseModel

Planner output: internal reasoning plus the list of tasks.

DeepAgent converts tasks into a Dict[int, TaskItem] for :class:RuntimeState.plan.

Attributes:

Name Type Description
reasoning_steps str

Planner's internal reasoning/chain-of-thought.

tasks list[TaskItem]

Ordered list of :class:TaskItem definitions.

Source code in src/pydantask/models/models.py
class Plan(BaseModel):
    """Planner output: internal reasoning plus the list of tasks.

    ``DeepAgent`` converts ``tasks`` into a ``Dict[int, TaskItem]`` for
    :class:`RuntimeState.plan`.

    Attributes:
        reasoning_steps: Planner's internal reasoning/chain-of-thought.
        tasks: Ordered list of :class:`TaskItem` definitions.
    """

    reasoning_steps: str = Field(
        description="Internal reasoning before finalizing the plan"
    )
    tasks: list[TaskItem]

Bases: BaseModel

Supervisor's decision for a single control-loop iteration.

Attributes:

Name Type Description
reasoning str

Explanation of why particular tasks were chosen or why the run should end.

tasks_to_execute List[int]

IDs of tasks that should be executed next.

feedback_to_subagents Optional[Dict[int, str]]

Optional mapping from task_id to feedback string to be injected into sub-agent execution.

all_tasks_completed bool

True if the plan is complete or cannot be progressed further.

Source code in src/pydantask/models/models.py
class SupervisorDecision(BaseModel):
    """Supervisor's decision for a single control-loop iteration.

    Attributes:
        reasoning: Explanation of why particular tasks were chosen or why the
            run should end.
        tasks_to_execute: IDs of tasks that should be executed next.
        feedback_to_subagents: Optional mapping from task_id to feedback string
            to be injected into sub-agent execution.
        all_tasks_completed: True if the plan is complete or cannot be
            progressed further.
    """

    # status: Literal["DELEGATE", "REPLAN", "COMPLETE", "ERROR"]
    reasoning: str = Field(
        description="Reasoning for why these tasks need to be completed next or the reasoning for when we are done executing."
    )
    tasks_to_execute: List[int] = Field(description="List of task id's to execute.")
    feedback_to_subagents: Optional[Dict[int, str]] = Field(
        default=None,
        description="Any feedback to the sub-agents if additional context or instructions needs to be given to the sub agent for a particular task. Dict is key: task_id, value: feedback for subagent for the given task.",
    )
    all_tasks_completed: bool = Field(
        default=False,
        description="Set this to true if all tasks in the plan have been completed OR there are too many errors that the plan cannot be completed.",
    )

Runtime and Capability Models

Bases: BaseModel

Shared mutable state passed between agents during a DeepAgent run.

Holds the current plan, objective, registry of capabilities, and simple in-memory stores for documents and knowledge.

Attributes:

Name Type Description
plan Dict[int, TaskItem]

Mapping from task_id to :class:TaskItem for the current plan.

objective str

The overall user objective being solved.

capability_registry Dict[str, Any]

Mapping from capability name to its description/ implementation (excluded from serialization).

completed_steps set[int]

Set of task_ids that have been completed.

runtime_steps int

Number of outer control-loop cycles executed so far.

tokens_used int

Placeholder for token accounting.

task_queue List[TaskItem]

Optional queue of additional tasks.

knowledge_store Dict[str, KnowledgeRecord]

Mapping of logical IDs to :class:KnowledgeRecords.

document_store Dict[str, str]

Mapping of logical document keys to in-memory content or identifiers.

Source code in src/pydantask/models/models.py
class RuntimeState(BaseModel):
    """Shared mutable state passed between agents during a DeepAgent run.

    Holds the current plan, objective, registry of capabilities, and simple
    in-memory stores for documents and knowledge.

    Attributes:
        plan: Mapping from task_id to :class:`TaskItem` for the current plan.
        objective: The overall user objective being solved.
        capability_registry: Mapping from capability name to its description/
            implementation (excluded from serialization).
        completed_steps: Set of task_ids that have been completed.
        runtime_steps: Number of outer control-loop cycles executed so far.
        tokens_used: Placeholder for token accounting.
        task_queue: Optional queue of additional tasks.
        knowledge_store: Mapping of logical IDs to :class:`KnowledgeRecord`s.
        document_store: Mapping of logical document keys to in-memory content or identifiers.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    plan: Dict[int, TaskItem] = Field(
        description="The plan that is generated to solve the users objective.",
        default_factory=dict,
    )
    objective: str = Field(description="The overall objective to solve for.")
    capability_registry: Dict[str, Any] = Field(
        description="Capabilities available to perform tasks.",
        default_factory=dict,
        exclude=True,
    )
    completed_steps: set[int] = Field(
        description="Steps from the plan that have been completed.", default_factory=set
    )
    next_task_id: int = Field(
        description="The next valid Task id value that could be added to a running plan"
    )
    runtime_steps: int = 0
    tokens_used: int = 0
    task_queue: List[TaskItem] = Field(default_factory=list)
    knowledge_store: Dict[str, KnowledgeRecord] = Field(
        default_factory=dict,
        description=(
            "Knowledge Store: maps logical IDs to  "
            "(files, summaries, notes, etc.)."
            "Accumulated information is stored here for other agents to use if needed."
        ),
    )  # simple in-memory document store
    document_store: Dict[str, str] = Field(
        description=(
            "In-memory document store for this run. Keys may be used by tools to store "
            "notes or intermediate artifacts."
        ),
        default_factory=dict,
    )
    checkpoint_recorder: Any | None = Field(
        default=None,
        description="Optional recorder used for event-sourced checkpointing.",
        exclude=True,
    )

Bases: BaseModel

Metadata describing a capability (sub-agent or tool) available to DeepAgent.

The planner and supervisor see name and description when deciding which capability to assign to TaskItem.capability. tool_func holds the underlying implementation (often a :class:pydantic_ai.Agent or callable tool).

Attributes:

Name Type Description
name str

Identifier of the capability (e.g. "research_agent").

description str

Human-readable description of what this capability does.

tool_func Any

Concrete implementation (agent instance or callable tool).

input_schema Optional[type[BaseModel]]

Optional Pydantic model class describing structured input for this capability.

Source code in src/pydantask/models/models.py
class CapabilityDescription(BaseModel):
    """Metadata describing a capability (sub-agent or tool) available to DeepAgent.

    The planner and supervisor see ``name`` and ``description`` when deciding
    which capability to assign to ``TaskItem.capability``. ``tool_func`` holds
    the underlying implementation (often a :class:`pydantic_ai.Agent` or
    callable tool).

    Attributes:
        name: Identifier of the capability (e.g. "research_agent").
        description: Human-readable description of what this capability does.
        tool_func: Concrete implementation (agent instance or callable tool).
        input_schema: Optional Pydantic model class describing structured input
            for this capability.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str = Field(
        description="Name of the agent/capability, e.g. 'web_search', 'file_writer', etc."
    )
    description: str = Field(
        description="Human-readable description of what this capability does."
    )
    # We keep this very loose to avoid Pydantic trying to introspect complex
    # types like `pydantic_ai.Agent` (which can reference optional imports
    # and cause schema generation issues). At runtime this will typically be
    # either a pydantic_ai Agent instance or a callable tool.
    tool_func: Any = Field(
        description=(
            "Concrete implementation of the capability. Usually a pydantic_ai Agent "
            "instance or an async/sync callable used as a tool."
        )
    )
    # Optional Pydantic model *class* describing structured input, if you
    # want to be explicit about what this capability expects.
    input_schema: Optional[type[BaseModel]] = Field(
        default=None,
        description=(
            "Optional Pydantic model class that defines the expected input schema "
            "for this capability, for better prompting and validation."
        ),
    )

Bases: BaseModel

Logical record of a knowledge artifact (file, summary, notes, etc.).

Stored in :class:RuntimeState.knowledge_store to track long-lived documents and their relationship to tasks.

Attributes:

Name Type Description
id str

Logical identifier (what tools use as key).

path Optional[str]

Filesystem path if this record is backed by a file.

task_ids list[int]

IDs of TaskItems this record is associated with.

summary Optional[str]

Short human-readable description of the content.

source_task_ids List[int]

IDs of tasks that produced or updated this record.

created_at datetime

Timestamp when the record was created.

metadata dict

Arbitrary extra info (tags, type=research/plan/etc.).

Source code in src/pydantask/models/models.py
class KnowledgeRecord(BaseModel):
    """Logical record of a knowledge artifact (file, summary, notes, etc.).

    Stored in :class:`RuntimeState.knowledge_store` to track long-lived documents
    and their relationship to tasks.

    Attributes:
        id: Logical identifier (what tools use as key).
        path: Filesystem path if this record is backed by a file.
        task_ids: IDs of TaskItems this record is associated with.
        summary: Short human-readable description of the content.
        source_task_ids: IDs of tasks that produced or updated this record.
        created_at: Timestamp when the record was created.
        metadata: Arbitrary extra info (tags, type=research/plan/etc.).
    """

    id: str = Field(description="Logical identifier (what tools use as key).")
    path: Optional[str] = Field(
        default=None, description="Filesystem path if this is backed by a file."
    )
    task_ids: list[int] = Field(
        description="which TaskItems this relates to (if any)", default_factory=list
    )
    summary: Optional[str] = Field(
        default=None, description="Short human-readable description of the content."
    )
    source_task_ids: List[int] = Field(
        default_factory=list,
        description="Tasks that produced or updated this document.",
    )
    created_at: datetime = Field(
        default_factory=datetime.now,
        description="When this knowledge item was created.",
    )
    metadata: dict = Field(
        default_factory=dict,
        description="Arbitrary extra info (tags, type=research/plan/etc).",
    )

See the source code for any additional helpers and the most up‑to‑date details.