Expose queue unprocessed and items being processed len

This commit is contained in:
Jose Fernandez
2023-09-15 15:22:44 -06:00
committed by Pires
parent 2ee2d9f63b
commit 238aebad73
2 changed files with 51 additions and 3 deletions

View File

@@ -288,6 +288,24 @@ func (q *Queue) Len() int {
return q.items.Len() + len(q.itemsBeingProcessed)
}
// UnprocessedLen returns the count of items yet to be processed in the queue
func (q *Queue) UnprocessedLen() int {
q.lock.Lock()
defer q.lock.Unlock()
if q.items.Len() != len(q.itemsInQueue) {
panic("Internally inconsistent state")
}
return len(q.itemsInQueue)
}
// ProcessedLen returns the count items that are being processed
func (q *Queue) ItemsBeingProcessedLen() int {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.itemsBeingProcessed)
}
// Run starts the workers
//
// It blocks until context is cancelled, and all of the workers exit.

View File

@@ -453,21 +453,51 @@ func (pc *PodController) Err() error {
return pc.err
}
// SyncPodsFromKubernetesQueueLen returns the length of the SyncPodsFromKubernetes queue
// SyncPodsFromKubernetesQueueLen returns the length of the SyncPodsFromKubernetes queue, which include items being processed and unprocessed
func (pc *PodController) SyncPodsFromKubernetesQueueLen() int {
return pc.syncPodsFromKubernetes.Len()
}
// DeletePodsFromKubernetesQueueLen returns the length of the DeletePodsFromKubernetes queue
// SyncPodsFromKubernetesQueueUnprocessedLen returns the length of the unprocessed items in the SyncPodsFromKubernetes queue
func (pc *PodController) SyncPodsFromKubernetesQueueUnprocessedLen() int {
return pc.syncPodsFromKubernetes.UnprocessedLen()
}
// SyncPodsFromKubernetesQueueItemsBeingProcessedLen returns the length of the items being processed in the SyncPodsFromKubernetes queue
func (pc *PodController) SyncPodsFromKubernetesQueueItemsBeingProcessedLen() int {
return pc.syncPodsFromKubernetes.ItemsBeingProcessedLen()
}
// DeletePodsFromKubernetesQueueLen returns the length of the DeletePodsFromKubernetes queue, which include items being processed and unprocessed
func (pc *PodController) DeletePodsFromKubernetesQueueLen() int {
return pc.deletePodsFromKubernetes.Len()
}
// SyncPodStatusFromProviderQueueLen returns the length of the SyncPodStatusFromProvider queue
// DeletePodsFromKubernetesQueueUnprocessedLen returns the length of the unprocessed items in the DeletePodsFromKubernetes queue
func (pc *PodController) DeletePodsFromKubernetesQueueUnprocessedLen() int {
return pc.deletePodsFromKubernetes.UnprocessedLen()
}
// DeletePodsFromKubernetesQueueItemsBeingProcessedLen returns the length of the items being processed in the DeletePodsFromKubernetes queue
func (pc *PodController) DeletePodsFromKubernetesQueueItemsBeingProcessedLen() int {
return pc.deletePodsFromKubernetes.ItemsBeingProcessedLen()
}
// SyncPodStatusFromProviderQueueLen returns the length of the SyncPodStatusFromProvider queue, which include items being processed and unprocessed
func (pc *PodController) SyncPodStatusFromProviderQueueLen() int {
return pc.syncPodStatusFromProvider.Len()
}
// SyncPodStatusFromProviderQueueUnprocessedLen returns the length of the unprocessed items in the SyncPodStatusFromProvider queue
func (pc *PodController) SyncPodStatusFromProviderQueueUnprocessedLen() int {
return pc.syncPodStatusFromProvider.UnprocessedLen()
}
// SyncPodStatusFromProviderQueueItemsBeingProcessedLen returns the length of the items being processed in the SyncPodStatusFromProvider queue
func (pc *PodController) SyncPodStatusFromProviderQueueItemsBeingProcessedLen() int {
return pc.syncPodStatusFromProvider.ItemsBeingProcessedLen()
}
// syncPodFromKubernetesHandler compares the actual state with the desired, and attempts to converge the two.
func (pc *PodController) syncPodFromKubernetesHandler(ctx context.Context, key string) error {
ctx, span := trace.StartSpan(ctx, "syncPodFromKubernetesHandler")