svn commit: r221059 - in head/sys: kern sys

Konstantin Belousov kib at FreeBSD.org
Tue Apr 26 11:39:56 UTC 2011


Author: kib
Date: Tue Apr 26 11:39:56 2011
New Revision: 221059
URL: http://svn.freebsd.org/changeset/base/221059

Log:
  Implement the delayed task execution extension to the taskqueue
  mechanism. The caller may specify a timeout in ticks after which the
  task will be scheduled.
  
  Sponsored by:	The FreeBSD Foundation
  Reviewed by:	jeff, jhb
  MFC after:	1 month

Added:
  head/sys/sys/_callout.h
     - copied, changed from r221058, head/sys/sys/callout.h
Modified:
  head/sys/kern/subr_taskqueue.c
  head/sys/sys/callout.h
  head/sys/sys/taskqueue.h

Modified: head/sys/kern/subr_taskqueue.c
==============================================================================
--- head/sys/kern/subr_taskqueue.c	Tue Apr 26 10:02:15 2011	(r221058)
+++ head/sys/kern/subr_taskqueue.c	Tue Apr 26 11:39:56 2011	(r221059)
@@ -61,12 +61,15 @@ struct taskqueue {
 	int			tq_tcount;
 	int			tq_spin;
 	int			tq_flags;
+	int			tq_callouts;
 };
 
 #define	TQ_FLAGS_ACTIVE		(1 << 0)
 #define	TQ_FLAGS_BLOCKED	(1 << 1)
 #define	TQ_FLAGS_PENDING	(1 << 2)
 
+#define	DT_CALLOUT_ARMED	(1 << 0)
+
 #define	TQ_LOCK(tq)							\
 	do {								\
 		if ((tq)->tq_spin)					\
@@ -83,6 +86,17 @@ struct taskqueue {
 			mtx_unlock(&(tq)->tq_mutex);			\
 	} while (0)
 
+void
+_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
+    int priority, task_fn_t func, void *context)
+{
+
+	TASK_INIT(&timeout_task->t, priority, func, context);
+	callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
+	timeout_task->q = queue;
+	timeout_task->f = 0;
+}
+
 static __inline int
 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
     int t)
@@ -129,7 +143,7 @@ static void
 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
 {
 
-	while (tq->tq_tcount > 0) {
+	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
 		wakeup(tq);
 		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
 	}
@@ -143,26 +157,24 @@ taskqueue_free(struct taskqueue *queue)
 	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
 	taskqueue_terminate(queue->tq_threads, queue);
 	KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
+	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
 	mtx_destroy(&queue->tq_mutex);
 	free(queue->tq_threads, M_TASKQUEUE);
 	free(queue, M_TASKQUEUE);
 }
 
-int
-taskqueue_enqueue(struct taskqueue *queue, struct task *task)
+static int
+taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
 {
 	struct task *ins;
 	struct task *prev;
 
-	TQ_LOCK(queue);
-
 	/*
 	 * Count multiple enqueues.
 	 */
 	if (task->ta_pending) {
 		task->ta_pending++;
-		TQ_UNLOCK(queue);
-		return 0;
+		return (0);
 	}
 
 	/*
@@ -190,9 +202,60 @@ taskqueue_enqueue(struct taskqueue *queu
 	else
 		queue->tq_flags |= TQ_FLAGS_PENDING;
 
+	return (0);
+}
+int
+taskqueue_enqueue(struct taskqueue *queue, struct task *task)
+{
+	int res;
+
+	TQ_LOCK(queue);
+	res = taskqueue_enqueue_locked(queue, task);
 	TQ_UNLOCK(queue);
 
-	return 0;
+	return (res);
+}
+
+static void
+taskqueue_timeout_func(void *arg)
+{
+	struct taskqueue *queue;
+	struct timeout_task *timeout_task;
+
+	timeout_task = arg;
+	queue = timeout_task->q;
+	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
+	timeout_task->f &= ~DT_CALLOUT_ARMED;
+	queue->tq_callouts--;
+	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
+}
+
+int
+taskqueue_enqueue_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task, int ticks)
+{
+	int res;
+
+	TQ_LOCK(queue);
+	KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
+	    ("Migrated queue"));
+	KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
+	timeout_task->q = queue;
+	res = timeout_task->t.ta_pending;
+	if (ticks == 0) {
+		taskqueue_enqueue_locked(queue, &timeout_task->t);
+	} else {
+		if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
+			res++;
+		} else {
+			queue->tq_callouts++;
+			timeout_task->f |= DT_CALLOUT_ARMED;
+		}
+		callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
+		    timeout_task);
+	}
+	TQ_UNLOCK(queue);
+	return (res);
 }
 
 void
@@ -271,6 +334,19 @@ task_is_running(struct taskqueue *queue,
 	return (0);
 }
 
+static int
+taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
+    u_int *pendp)
+{
+
+	if (task->ta_pending > 0)
+		STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
+	if (pendp != NULL)
+		*pendp = task->ta_pending;
+	task->ta_pending = 0;
+	return (task_is_running(queue, task) ? EBUSY : 0);
+}
+
 int
 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
 {
@@ -278,14 +354,31 @@ taskqueue_cancel(struct taskqueue *queue
 	int error;
 
 	TQ_LOCK(queue);
-	if ((pending = task->ta_pending) > 0)
-		STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
-	task->ta_pending = 0;
-	error = task_is_running(queue, task) ? EBUSY : 0;
+	pending = task->ta_pending;
+	error = taskqueue_cancel_locked(queue, task, pendp);
+	TQ_UNLOCK(queue);
+
+	return (error);
+}
+
+int
+taskqueue_cancel_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task, u_int *pendp)
+{
+	u_int pending, pending1;
+	int error;
+
+	TQ_LOCK(queue);
+	pending = !!callout_stop(&timeout_task->c);
+	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
+	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
+		timeout_task->f &= ~DT_CALLOUT_ARMED;
+		queue->tq_callouts--;
+	}
 	TQ_UNLOCK(queue);
 
 	if (pendp != NULL)
-		*pendp = pending;
+		*pendp = pending + pending1;
 	return (error);
 }
 
@@ -302,6 +395,15 @@ taskqueue_drain(struct taskqueue *queue,
 	TQ_UNLOCK(queue);
 }
 
+void
+taskqueue_drain_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task)
+{
+
+	callout_drain(&timeout_task->c);
+	taskqueue_drain(queue, &timeout_task->t);
+}
+
 static void
 taskqueue_swi_enqueue(void *context)
 {

Copied and modified: head/sys/sys/_callout.h (from r221058, head/sys/sys/callout.h)
==============================================================================
--- head/sys/sys/callout.h	Tue Apr 26 10:02:15 2011	(r221058, copy source)
+++ head/sys/sys/_callout.h	Tue Apr 26 11:39:56 2011	(r221059)
@@ -35,8 +35,8 @@
  * $FreeBSD$
  */
 
-#ifndef _SYS_CALLOUT_H_
-#define _SYS_CALLOUT_H_
+#ifndef _SYS__CALLOUT_H
+#define	_SYS__CALLOUT_H
 
 #include <sys/queue.h>
 
@@ -58,47 +58,4 @@ struct callout {
 	volatile int c_cpu;			/* CPU we're scheduled on */
 };
 
-#define	CALLOUT_LOCAL_ALLOC	0x0001 /* was allocated from callfree */
-#define	CALLOUT_ACTIVE		0x0002 /* callout is currently active */
-#define	CALLOUT_PENDING		0x0004 /* callout is waiting for timeout */
-#define	CALLOUT_MPSAFE		0x0008 /* callout handler is mp safe */
-#define	CALLOUT_RETURNUNLOCKED	0x0010 /* handler returns with mtx unlocked */
-#define	CALLOUT_SHAREDLOCK	0x0020 /* callout lock held in shared mode */
-
-struct callout_handle {
-	struct callout *callout;
-};
-
-#ifdef _KERNEL
-extern int ncallout;
-
-#define	callout_active(c)	((c)->c_flags & CALLOUT_ACTIVE)
-#define	callout_deactivate(c)	((c)->c_flags &= ~CALLOUT_ACTIVE)
-#define	callout_drain(c)	_callout_stop_safe(c, 1)
-void	callout_init(struct callout *, int);
-void	_callout_init_lock(struct callout *, struct lock_object *, int);
-#define	callout_init_mtx(c, mtx, flags)					\
-	_callout_init_lock((c), ((mtx) != NULL) ? &(mtx)->lock_object :	\
-	    NULL, (flags))
-#define	callout_init_rw(c, rw, flags)					\
-	_callout_init_lock((c), ((rw) != NULL) ? &(rw)->lock_object :	\
-	   NULL, (flags))
-#define	callout_pending(c)	((c)->c_flags & CALLOUT_PENDING)
-int	callout_reset_on(struct callout *, int, void (*)(void *), void *, int);
-#define	callout_reset(c, on_tick, fn, arg)				\
-    callout_reset_on((c), (on_tick), (fn), (arg), (c)->c_cpu)
-#define	callout_reset_curcpu(c, on_tick, fn, arg)			\
-    callout_reset_on((c), (on_tick), (fn), (arg), PCPU_GET(cpuid))
-int	callout_schedule(struct callout *, int);
-int	callout_schedule_on(struct callout *, int, int);
-#define	callout_schedule_curcpu(c, on_tick)				\
-    callout_schedule_on((c), (on_tick), PCPU_GET(cpuid))
-#define	callout_stop(c)		_callout_stop_safe(c, 0)
-int	_callout_stop_safe(struct callout *, int);
-void	callout_tick(void);
-int	callout_tickstofirst(int limit);
-extern void (*callout_new_inserted)(int cpu, int ticks);
-
 #endif
-
-#endif /* _SYS_CALLOUT_H_ */

Modified: head/sys/sys/callout.h
==============================================================================
--- head/sys/sys/callout.h	Tue Apr 26 10:02:15 2011	(r221058)
+++ head/sys/sys/callout.h	Tue Apr 26 11:39:56 2011	(r221059)
@@ -38,25 +38,7 @@
 #ifndef _SYS_CALLOUT_H_
 #define _SYS_CALLOUT_H_
 
-#include <sys/queue.h>
-
-struct lock_object;
-
-SLIST_HEAD(callout_list, callout);
-TAILQ_HEAD(callout_tailq, callout);
-
-struct callout {
-	union {
-		SLIST_ENTRY(callout) sle;
-		TAILQ_ENTRY(callout) tqe;
-	} c_links;
-	int	c_time;				/* ticks to the event */
-	void	*c_arg;				/* function argument */
-	void	(*c_func)(void *);		/* function to call */
-	struct lock_object *c_lock;		/* lock to handle */
-	int	c_flags;			/* state of this entry */
-	volatile int c_cpu;			/* CPU we're scheduled on */
-};
+#include <sys/_callout.h>
 
 #define	CALLOUT_LOCAL_ALLOC	0x0001 /* was allocated from callfree */
 #define	CALLOUT_ACTIVE		0x0002 /* callout is currently active */

Modified: head/sys/sys/taskqueue.h
==============================================================================
--- head/sys/sys/taskqueue.h	Tue Apr 26 10:02:15 2011	(r221058)
+++ head/sys/sys/taskqueue.h	Tue Apr 26 11:39:56 2011	(r221059)
@@ -35,10 +35,18 @@
 
 #include <sys/queue.h>
 #include <sys/_task.h>
+#include <sys/_callout.h>
 
 struct taskqueue;
 struct thread;
 
+struct timeout_task {
+	struct taskqueue *q;
+	struct task t;
+	struct callout c;
+	int    f;
+};
+
 /*
  * A notification callback function which is called from
  * taskqueue_enqueue().  The context argument is given in the call to
@@ -54,9 +62,15 @@ struct taskqueue *taskqueue_create(const
 int	taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
 				const char *name, ...) __printflike(4, 5);
 int	taskqueue_enqueue(struct taskqueue *queue, struct task *task);
+int	taskqueue_enqueue_timeout(struct taskqueue *queue,
+	    struct timeout_task *timeout_task, int ticks);
 int	taskqueue_cancel(struct taskqueue *queue, struct task *task,
 	    u_int *pendp);
+int	taskqueue_cancel_timeout(struct taskqueue *queue,
+	    struct timeout_task *timeout_task, u_int *pendp);
 void	taskqueue_drain(struct taskqueue *queue, struct task *task);
+void	taskqueue_drain_timeout(struct taskqueue *queue,
+	    struct timeout_task *timeout_task);
 void	taskqueue_free(struct taskqueue *queue);
 void	taskqueue_run(struct taskqueue *queue);
 void	taskqueue_block(struct taskqueue *queue);
@@ -79,6 +93,12 @@ void	taskqueue_thread_enqueue(void *cont
 	(task)->ta_context = (context);			\
 } while (0)
 
+void _timeout_task_init(struct taskqueue *queue,
+	    struct timeout_task *timeout_task, int priority, task_fn_t func,
+	    void *context);
+#define	TIMEOUT_TASK_INIT(queue, timeout_task, priority, func, context) \
+	_timeout_task_init(queue, timeout_task, priority, func, context);
+
 /*
  * Declare a reference to a taskqueue.
  */


More information about the svn-src-head mailing list