-
Notifications
You must be signed in to change notification settings - Fork 1k
Move fabric8 discovery to listers #2133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e38ab5f
abc7c0f
08f0e87
51864ff
ba9b9fc
f02b796
7db1fab
bb4ecc6
536db1b
a283a66
cac69c1
ee91966
e4a735c
b2089f5
8b928c0
afe758f
41d3ae9
9389d52
3d7cda5
1a84473
a1daeee
dfe9930
6971ed1
8d618e1
e839167
4b0a9a6
b68952c
b1045d0
488e709
552d8a1
18b91cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,17 +21,22 @@ | |
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.function.Predicate; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import io.fabric8.kubernetes.api.model.EndpointAddress; | ||
| import io.fabric8.kubernetes.api.model.EndpointSubset; | ||
| import io.fabric8.kubernetes.api.model.Endpoints; | ||
| import io.fabric8.kubernetes.api.model.Service; | ||
| import io.fabric8.kubernetes.client.KubernetesClient; | ||
| import io.fabric8.kubernetes.client.informers.SharedIndexInformer; | ||
| import io.fabric8.kubernetes.client.informers.cache.Lister; | ||
| import jakarta.annotation.PostConstruct; | ||
| import jakarta.annotation.PreDestroy; | ||
| import org.apache.commons.logging.LogFactory; | ||
|
|
||
| import org.springframework.cloud.client.ServiceInstance; | ||
| import org.springframework.cloud.client.discovery.DiscoveryClient; | ||
| import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider; | ||
| import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; | ||
| import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata; | ||
| import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber; | ||
|
|
@@ -46,10 +51,9 @@ | |
| import static org.springframework.cloud.kubernetes.fabric8.Fabric8Utils.serviceMetadata; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8DiscoveryClientUtils.addresses; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8DiscoveryClientUtils.endpointSubsetsPortData; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8DiscoveryClientUtils.endpoints; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8DiscoveryClientUtils.services; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8InstanceIdHostPodNameSupplier.externalName; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8InstanceIdHostPodNameSupplier.nonExternalName; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8DiscoveryClientUtils.postConstruct; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8InstanceIdHostPodNameSupplier.fabric8InstanceIdHostPodNameSupplier; | ||
| import static org.springframework.cloud.kubernetes.fabric8.discovery.Fabric8PodLabelsAndAnnotationsSupplier.fabric8PodLabelsAndAnnotationsSupplier; | ||
|
|
||
| /** | ||
| * @author wind57 | ||
|
|
@@ -59,70 +63,97 @@ abstract class Fabric8AbstractBlockingDiscoveryClient implements DiscoveryClient | |
| private static final LogAccessor LOG = new LogAccessor( | ||
| LogFactory.getLog(Fabric8AbstractBlockingDiscoveryClient.class)); | ||
|
|
||
| private final KubernetesDiscoveryProperties properties; | ||
| private final List<Lister<Service>> serviceListers; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listers and Informers here, just like the case for the k8s-native client. Unlike the native client, we don't have to deal with |
||
|
|
||
| private final ServicePortSecureResolver servicePortSecureResolver; | ||
| private final List<Lister<Endpoints>> endpointsListers; | ||
|
|
||
| private final KubernetesClient client; | ||
| private final List<SharedIndexInformer<Service>> serviceInformers; | ||
|
|
||
| private final KubernetesNamespaceProvider namespaceProvider; | ||
| private final List<SharedIndexInformer<Endpoints>> endpointsInformers; | ||
|
|
||
| private final Supplier<Boolean> informersReadyFunc; | ||
|
|
||
| private final KubernetesDiscoveryProperties properties; | ||
|
|
||
| private final Predicate<Service> predicate; | ||
|
|
||
| Fabric8AbstractBlockingDiscoveryClient(KubernetesClient client, | ||
| KubernetesDiscoveryProperties kubernetesDiscoveryProperties, | ||
| ServicePortSecureResolver servicePortSecureResolver, KubernetesNamespaceProvider namespaceProvider, | ||
| private final ServicePortSecureResolver servicePortSecureResolver; | ||
|
|
||
| private final KubernetesClient kubernetesClient; | ||
|
|
||
| Fabric8AbstractBlockingDiscoveryClient(KubernetesClient kubernetesClient, List<Lister<Service>> serviceListers, | ||
| List<Lister<Endpoints>> endpointsListers, List<SharedIndexInformer<Service>> serviceInformers, | ||
| List<SharedIndexInformer<Endpoints>> endpointsInformers, KubernetesDiscoveryProperties properties, | ||
| Predicate<Service> predicate) { | ||
|
|
||
| this.client = client; | ||
| this.properties = kubernetesDiscoveryProperties; | ||
| this.servicePortSecureResolver = servicePortSecureResolver; | ||
| this.namespaceProvider = namespaceProvider; | ||
| this.serviceListers = serviceListers; | ||
| this.endpointsListers = endpointsListers; | ||
| this.serviceInformers = serviceInformers; | ||
| this.endpointsInformers = endpointsInformers; | ||
| this.properties = properties; | ||
| this.predicate = predicate; | ||
| this.kubernetesClient = kubernetesClient; | ||
|
|
||
| servicePortSecureResolver = new ServicePortSecureResolver(properties); | ||
|
|
||
| this.informersReadyFunc = () -> { | ||
| boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream() | ||
| .map(SharedIndexInformer::hasSynced) | ||
| .reduce(Boolean::logicalAnd) | ||
| .orElse(false); | ||
| boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream() | ||
| .map(SharedIndexInformer::hasSynced) | ||
| .reduce(Boolean::logicalAnd) | ||
| .orElse(false); | ||
| return serviceInformersReady && endpointsInformersReady; | ||
| }; | ||
| } | ||
|
|
||
| public abstract String description(); | ||
|
|
||
| @Override | ||
| public List<ServiceInstance> getInstances(String serviceId) { | ||
| Objects.requireNonNull(serviceId); | ||
| Objects.requireNonNull(serviceId, "serviceId must be provided"); | ||
|
|
||
| List<Endpoints> allEndpoints = endpoints(properties, client, namespaceProvider, "fabric8-discovery", serviceId, | ||
| predicate); | ||
| List<Service> allServices = serviceListers.stream() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a 1-1 mapping with k8s-native, meaning we do things the same here, much easier to reason like this |
||
| .flatMap(x -> x.list().stream()) | ||
| .filter(service -> service.getMetadata() != null) | ||
| .filter(service -> serviceId.equals(service.getMetadata().getName())) | ||
| .toList(); | ||
|
|
||
| List<ServiceInstance> instances = new ArrayList<>(); | ||
| for (Endpoints endpoints : allEndpoints) { | ||
| // endpoints are only those that matched the serviceId | ||
| instances.addAll(serviceInstances(endpoints, serviceId)); | ||
| } | ||
| List<ServiceInstance> serviceInstances = allServices.stream() | ||
| .filter(predicate) | ||
| .flatMap(service -> serviceInstances(service).stream()) | ||
| .collect(Collectors.toCollection(ArrayList::new)); | ||
|
|
||
| if (properties.includeExternalNameServices()) { | ||
| LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId); | ||
| List<Service> services = services(properties, client, namespaceProvider, | ||
| s -> s.getSpec().getType().equals(EXTERNAL_NAME), Map.of("metadata.name", serviceId), | ||
| "fabric8-discovery"); | ||
| for (Service service : services) { | ||
| List<Service> externalNameServices = allServices.stream() | ||
| .filter(s -> s.getSpec() != null) | ||
| .filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType())) | ||
| .toList(); | ||
| for (Service service : externalNameServices) { | ||
| ServiceMetadata serviceMetadata = serviceMetadata(service); | ||
| Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata, | ||
| properties); | ||
|
|
||
| Fabric8InstanceIdHostPodNameSupplier supplierOne = externalName(service); | ||
|
|
||
| ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne, | ||
| serviceInstanceMetadata); | ||
|
|
||
| instances.add(externalNameServiceInstance); | ||
| Fabric8InstanceIdHostPodNameSupplier fabric8InstanceIdHostPodNameSupplier = fabric8InstanceIdHostPodNameSupplier( | ||
| service); | ||
| ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, | ||
| fabric8InstanceIdHostPodNameSupplier, serviceInstanceMetadata); | ||
| serviceInstances.add(externalNameServiceInstance); | ||
| } | ||
| } | ||
|
|
||
| return instances; | ||
| return serviceInstances; | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> getServices() { | ||
| List<String> services = services(properties, client, namespaceProvider, predicate, null, "fabric8 discovery") | ||
| .stream() | ||
| .map(service -> service.getMetadata().getName()) | ||
| List<String> services = serviceListers.stream() | ||
| .flatMap(serviceLister -> serviceLister.list().stream()) | ||
| .filter(predicate) | ||
| .map(s -> s.getMetadata().getName()) | ||
| .distinct() | ||
| .toList(); | ||
| LOG.debug(() -> "will return services : " + services); | ||
|
|
@@ -134,38 +165,62 @@ public int getOrder() { | |
| return properties.order(); | ||
| } | ||
|
|
||
| private List<ServiceInstance> serviceInstances(Endpoints endpoints, String serviceId) { | ||
| @PostConstruct | ||
| void afterPropertiesSet() { | ||
| postConstruct(properties, informersReadyFunc, serviceListers); | ||
| } | ||
|
|
||
| List<EndpointSubset> subsets = endpoints.getSubsets(); | ||
| if (subsets.isEmpty()) { | ||
| LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets"); | ||
| return List.of(); | ||
| } | ||
| @PreDestroy | ||
| void preDestroy() { | ||
| serviceInformers.forEach(SharedIndexInformer::close); | ||
| endpointsInformers.forEach(SharedIndexInformer::close); | ||
| } | ||
|
|
||
| private List<ServiceInstance> serviceInstances(Service service) { | ||
|
|
||
| String serviceId = service.getMetadata().getName(); | ||
| String serviceNamespace = service.getMetadata().getNamespace(); | ||
|
|
||
| String namespace = endpoints.getMetadata().getNamespace(); | ||
| List<ServiceInstance> instances = new ArrayList<>(); | ||
|
|
||
| Service service = client.services().inNamespace(namespace).withName(serviceId).get(); | ||
| ServiceMetadata serviceMetadata = serviceMetadata(service); | ||
| Map<String, Integer> portsData = endpointSubsetsPortData(subsets); | ||
| List<Endpoints> allEndpoints = endpointsListers.stream() | ||
| .map(endpointsLister -> endpointsLister.namespace(serviceNamespace).get(serviceId)) | ||
| .filter(Objects::nonNull) | ||
| .toList(); | ||
|
|
||
| ServiceMetadata k8sServiceMetadata = serviceMetadata(service); | ||
|
|
||
| for (Endpoints endpoints : allEndpoints) { | ||
| List<EndpointSubset> subsets = endpoints.getSubsets(); | ||
| if (subsets == null || subsets.isEmpty()) { | ||
| LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets"); | ||
| } | ||
| else { | ||
| Map<String, Integer> portsData = endpointSubsetsPortData(subsets); | ||
| Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, k8sServiceMetadata, | ||
| properties); | ||
|
|
||
| Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata, properties); | ||
| for (EndpointSubset endpointSubset : subsets) { | ||
|
|
||
| for (EndpointSubset endpointSubset : subsets) { | ||
| Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset)); | ||
| ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, k8sServiceMetadata, | ||
| properties); | ||
|
|
||
| Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset)); | ||
| ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties); | ||
| List<EndpointAddress> addresses = addresses(endpointSubset, properties); | ||
| for (EndpointAddress endpointAddress : addresses) { | ||
|
|
||
| List<EndpointAddress> addresses = addresses(endpointSubset, properties); | ||
| for (EndpointAddress endpointAddress : addresses) { | ||
| Fabric8InstanceIdHostPodNameSupplier instanceIdHostPodNameSupplier = fabric8InstanceIdHostPodNameSupplier( | ||
| endpointAddress, service); | ||
| Fabric8PodLabelsAndAnnotationsSupplier podLabelsAndAnnotationsSupplier = fabric8PodLabelsAndAnnotationsSupplier( | ||
| kubernetesClient, service.getMetadata().getNamespace()); | ||
|
|
||
| Fabric8InstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress, service); | ||
| Fabric8PodLabelsAndAnnotationsSupplier supplierTwo = Fabric8PodLabelsAndAnnotationsSupplier | ||
| .nonExternalName(client, namespace); | ||
| ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, k8sServiceMetadata, | ||
| instanceIdHostPodNameSupplier, podLabelsAndAnnotationsSupplier, portData, | ||
| serviceInstanceMetadata, properties); | ||
| instances.add(serviceInstance); | ||
| } | ||
| } | ||
|
|
||
| ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata, | ||
| supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties); | ||
| instances.add(serviceInstance); | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we used to create this one in place where needed, but then I noticed that in the k8s-client native, we create in the auto-configuration. So I decided to do it in the fabric8 case too.
Also, it simplifies testing a lot, having it as a Bean