def_init_executor(self) -> None: """Initialize the worker and load the model. If speculative decoding is enabled, we instead create the speculative worker. """ if self.speculative_config isNone: self._init_non_spec_worker() else: self._init_spec_worker()
def_init_non_spec_worker(self): assert self.parallel_config.world_size == 1, ( "GPUExecutor only supports single GPU.")
defdetermine_num_available_blocks(self) -> Tuple[int, int]: """Determine the number of available KV blocks by invoking the underlying worker. """ return self.driver_worker.determine_num_available_blocks()
definitialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None: """Initialize the KV cache by invoking the underlying worker. """ # NOTE: This is logged in the executor because there can be >1 worker # with other executors. We could log in the engine level, but work # remains to abstract away the device for non-GPU configurations. logger.info("# GPU blocks: %d, # CPU blocks: %d", num_gpu_blocks, num_cpu_blocks)
definitialize_ray_cluster( parallel_config: ParallelConfig, ray_address: Optional[str] = None, ): """Initialize the distributed cluster with Ray. it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker. Args: parallel_config: The configurations for parallel execution. ray_address: The address of the Ray cluster. If None, uses the default Ray cluster address. """ if ray isNone: raise ImportError( "Ray is not installed. Please install Ray to use distributed " "serving.")
# Connect to a ray cluster. 初始化Ray引擎 if is_hip(): #针对于AMD显卡 ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) else: ray.init(address=ray_address, ignore_reinit_error=True)
if parallel_config.placement_group: # Placement group is already set. return
# Create placement group for worker processes current_placement_group = ray.util.get_current_placement_group() if current_placement_group: # We are in a placement group bundles = current_placement_group.bundle_specs # Verify that we can use the placement group. gpu_bundles = 0 for bundle in bundles: bundle_gpus = bundle.get("GPU", 0) if bundle_gpus > 1: raise ValueError( "Placement group bundle cannot have more than 1 GPU.") if bundle_gpus: gpu_bundles += 1 if parallel_config.world_size > gpu_bundles: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the placement group.") else: num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0) if parallel_config.world_size > num_gpus_in_cluster: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the cluster.") # Create a new placement group placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size) current_placement_group = ray.util.placement_group( placement_group_specs) # Wait until PG is ready - this will block until all # requested resources are available, and will timeout # if they cannot be provisioned. ray.get(current_placement_group.ready(), timeout=1800)
# Set the placement group in the parallel config parallel_config.placement_group = current_placement_group
def_init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): ...... self.driver_dummy_worker: Optional[RayWorkerWrapper] = None # The remaining workers are the actual ray actors. self.workers: List[RayWorkerWrapper] = [] # Create the workers. driver_ip = get_ip() for bundle_id, bundle inenumerate(placement_group.bundle_specs): ifnot bundle.get("GPU", 0): continue scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, placement_group_capture_child_tasks=True, placement_group_bundle_index=bundle_id, ) worker = ray.remote( num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote( worker_module_name="vllm.worker.worker", worker_class_name="Worker", trust_remote_code=self.model_config.trust_remote_code, )
worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker isNone: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker self.driver_worker = RayWorkerWrapper( worker_module_name="vllm.worker.worker", worker_class_name="Worker", trust_remote_code=self.model_config.trust_remote_code, ) else: # Else, added to the list of workers. self.workers.append(worker)
# Get the set of GPU IDs used on each node. worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True)
for i, (node_id, gpu_ids) inenumerate(worker_node_and_gpu_ids): node_workers[node_id].append(i) node_gpus[node_id].extend(gpu_ids) for node_id, gpu_ids in node_gpus.items(): node_gpus[node_id] = sorted(gpu_ids)