Viewing File: /home/ubuntu/combine_ai/combine/lib/python3.10/site-packages/uvicorn/protocols/http/flow_control.py
import asyncio
from uvicorn._types import (
ASGIReceiveCallable,
ASGISendCallable,
HTTPResponseBodyEvent,
HTTPResponseStartEvent,
Scope,
)
CLOSE_HEADER = (b"connection", b"close")
HIGH_WATER_LIMIT = 65536
class FlowControl:
def __init__(self, transport: asyncio.Transport) -> None:
self._transport = transport
self.read_paused = False
self.write_paused = False
self._is_writable_event = asyncio.Event()
self._is_writable_event.set()
async def drain(self) -> None:
await self._is_writable_event.wait()
def pause_reading(self) -> None:
if not self.read_paused:
self.read_paused = True
self._transport.pause_reading()
def resume_reading(self) -> None:
if self.read_paused:
self.read_paused = False
self._transport.resume_reading()
def pause_writing(self) -> None:
if not self.write_paused:
self.write_paused = True
self._is_writable_event.clear()
def resume_writing(self) -> None:
if self.write_paused:
self.write_paused = False
self._is_writable_event.set()
async def service_unavailable(scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None:
response_start: "HTTPResponseStartEvent" = {
"type": "http.response.start",
"status": 503,
"headers": [
(b"content-type", b"text/plain; charset=utf-8"),
(b"connection", b"close"),
],
}
await send(response_start)
response_body: "HTTPResponseBodyEvent" = {
"type": "http.response.body",
"body": b"Service Unavailable",
"more_body": False,
}
await send(response_body)
Back to Directory
File Manager