|
@@ -215,6 +215,7 @@ class TransparentSession:
|
|
|
closed: bool = False
|
|
closed: bool = False
|
|
|
pump_task: asyncio.Task | None = None
|
|
pump_task: asyncio.Task | None = None
|
|
|
loser_close_task: asyncio.Task | None = None
|
|
loser_close_task: asyncio.Task | None = None
|
|
|
|
|
+ open_tasks: list[asyncio.Task] = field(default_factory=list)
|
|
|
|
|
|
|
|
def _record_win(self, winner: BasePath) -> None:
|
|
def _record_win(self, winner: BasePath) -> None:
|
|
|
self.stats[winner.name] = self.stats.get(winner.name, 0) + 1
|
|
self.stats[winner.name] = self.stats.get(winner.name, 0) + 1
|
|
@@ -237,8 +238,8 @@ class TransparentSession:
|
|
|
print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}")
|
|
print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}")
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
async def start(self) -> None:
|
|
|
- await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
|
|
|
|
|
- await asyncio.wait_for(self.open_event.wait(), timeout=15)
|
|
|
|
|
|
|
+ self.open_tasks = [asyncio.create_task(path.open(self.target)) for path in self.paths]
|
|
|
|
|
+ await asyncio.wait_for(self.open_event.wait(), timeout=8)
|
|
|
if self.opened_count == 0:
|
|
if self.opened_count == 0:
|
|
|
raise ConnectionError(self.errors[0] if self.errors else "all paths failed")
|
|
raise ConnectionError(self.errors[0] if self.errors else "all paths failed")
|
|
|
self.pump_task = asyncio.create_task(self._pump_local())
|
|
self.pump_task = asyncio.create_task(self._pump_local())
|
|
@@ -311,7 +312,12 @@ class TransparentSession:
|
|
|
if self.closed:
|
|
if self.closed:
|
|
|
return
|
|
return
|
|
|
self.closed = True
|
|
self.closed = True
|
|
|
- print(f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port}")
|
|
|
|
|
|
|
+ if self.errors:
|
|
|
|
|
+ detail = ", ".join(self.errors[:3])
|
|
|
|
|
+ print(
|
|
|
|
|
+ f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port} "
|
|
|
|
|
+ f"errors={len(self.errors)} detail={detail}"
|
|
|
|
|
+ )
|
|
|
if self.pump_task and self.pump_task is not asyncio.current_task():
|
|
if self.pump_task and self.pump_task is not asyncio.current_task():
|
|
|
self.pump_task.cancel()
|
|
self.pump_task.cancel()
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|
|
@@ -320,6 +326,13 @@ class TransparentSession:
|
|
|
self.loser_close_task.cancel()
|
|
self.loser_close_task.cancel()
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|
|
|
await self.loser_close_task
|
|
await self.loser_close_task
|
|
|
|
|
+ for task in self.open_tasks:
|
|
|
|
|
+ if task is not asyncio.current_task():
|
|
|
|
|
+ task.cancel()
|
|
|
|
|
+ for task in self.open_tasks:
|
|
|
|
|
+ if task is not asyncio.current_task():
|
|
|
|
|
+ with contextlib.suppress(Exception):
|
|
|
|
|
+ await task
|
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
|
self.writer.close()
|
|
self.writer.close()
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|
|
@@ -485,10 +498,6 @@ class UdpFlow:
|
|
|
self.send_task.cancel()
|
|
self.send_task.cancel()
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|
|
|
await self.send_task
|
|
await self.send_task
|
|
|
- print(
|
|
|
|
|
- f"[edge] udp flow={self.flow_id} closed target={self.target.host}:{self.target.port} "
|
|
|
|
|
- f"sent={self.packets_sent} received={self.packets_received} dup={self.duplicate_responses}"
|
|
|
|
|
- )
|
|
|
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -665,6 +674,8 @@ class TransparentEdge:
|
|
|
await asyncio.gather(server4.serve_forever(), server6.serve_forever())
|
|
await asyncio.gather(server4.serve_forever(), server6.serve_forever())
|
|
|
|
|
|
|
|
def _direct_redundancy_for_target(self, target: TargetAddress) -> int:
|
|
def _direct_redundancy_for_target(self, target: TargetAddress) -> int:
|
|
|
|
|
+ if target.family == socket.AF_INET6 and not self.config.direct_ipv6_enabled:
|
|
|
|
|
+ return 0
|
|
|
base = self.config.direct_redundancy
|
|
base = self.config.direct_redundancy
|
|
|
if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None:
|
|
if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None:
|
|
|
base = self.config.direct_redundancy_v6
|
|
base = self.config.direct_redundancy_v6
|
|
@@ -682,6 +693,8 @@ class TransparentEdge:
|
|
|
|
|
|
|
|
def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]:
|
|
def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]:
|
|
|
count = self._direct_redundancy_for_target(session.target)
|
|
count = self._direct_redundancy_for_target(session.target)
|
|
|
|
|
+ if count <= 0:
|
|
|
|
|
+ return []
|
|
|
return [
|
|
return [
|
|
|
DirectTcpPath(
|
|
DirectTcpPath(
|
|
|
name=f"direct-{index + 1}" if count > 1 else "direct",
|
|
name=f"direct-{index + 1}" if count > 1 else "direct",
|
|
@@ -694,6 +707,8 @@ class TransparentEdge:
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
def _build_udp_direct_paths(self, target: TargetAddress, flow_id: int) -> list[BasePath]:
|
|
def _build_udp_direct_paths(self, target: TargetAddress, flow_id: int) -> list[BasePath]:
|
|
|
|
|
+ if target.family == socket.AF_INET6 and not self.config.direct_ipv6_enabled:
|
|
|
|
|
+ return []
|
|
|
count = max(1, self.config.udp_direct_redundancy)
|
|
count = max(1, self.config.udp_direct_redundancy)
|
|
|
if target.family == socket.AF_INET6 and self.config.udp_direct_redundancy_v6 is not None:
|
|
if target.family == socket.AF_INET6 and self.config.udp_direct_redundancy_v6 is not None:
|
|
|
count = max(1, self.config.udp_direct_redundancy_v6)
|
|
count = max(1, self.config.udp_direct_redundancy_v6)
|