69 lines
2.5 KiB
Python
69 lines
2.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from collections import deque
|
|
from threading import Lock
|
|
|
|
class ActionItem:
|
|
def __init__(self, key, action):
|
|
self.key = key
|
|
self.action = action
|
|
self.started = False
|
|
self.data = None
|
|
|
|
class ActionQueue:
|
|
def __init__(self):
|
|
self.actions = {}
|
|
self.queue = deque()
|
|
self.lock = Lock()
|
|
self.is_running = False
|
|
|
|
def get_actions(self):
|
|
# Return copy of actions, so they can be traversed without state changes
|
|
with self.lock:
|
|
return self.actions.copy()
|
|
|
|
def enqueue_action(self, key, action):
|
|
# Enqueue action
|
|
with self.lock:
|
|
if key in self.actions:
|
|
# If the key alredy has a pending action, reject any other actions
|
|
return
|
|
item = ActionItem(key, action)
|
|
self.actions[key] = item
|
|
self.queue.append(item)
|
|
|
|
def process_actions(self):
|
|
# Main method for deferred queue processing called by WSGI close handler
|
|
with self.lock:
|
|
# If the queue is being processesd by another thread, allow this thread to be terminated
|
|
if self.is_running:
|
|
return
|
|
while True:
|
|
with self.lock:
|
|
# Try to get an item from queue
|
|
item = None
|
|
if self.queue:
|
|
item = self.queue.popleft()
|
|
# If there are no more queued items, unset the processing flag and allow the thread to be terminated
|
|
if not item:
|
|
self.is_running = False
|
|
return
|
|
# If there is an item to be processed, set processing flags and exit the lock
|
|
self.is_running = True
|
|
item.started = True
|
|
try:
|
|
# Call the method passed in item.action with the whole item as parameter
|
|
item.action(item)
|
|
# If the action finished without errors, restore nominal state by deleting the item from action list
|
|
self.clear_action(item.key)
|
|
except BaseException as e:
|
|
# If the action failed, store the exception and leave it in the list form manual clearance
|
|
with self.lock:
|
|
item.data = e
|
|
|
|
def clear_action(self, key):
|
|
# Restore nominal state by deleting the item from action list
|
|
with self.lock:
|
|
if key in self.actions:
|
|
del self.actions[key]
|