Re: MonetDB: default - Low-level task scheduler.
There are a few things wrong with this code:
- sz·=·((sz·<<·1)·>>·1); does *not* turn sz into a multiple of two (as suggested by the comment). This statement basically is a no-op. - when you create joinable threads, you should join them. - it's not a great idea to use assert to make sure that GDKmalloc succeeds. Better is to return an error. - The include of monet_options.h should be in the C file, not in the include file (which would be included elsewhere where monet_options.h will already be included).
On 2012-11-07 13:03, Martin Kersten wrote:
Changeset: 5ff3c16e865f for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5ff3c16e865f Added Files: gdk/gdk_mapreduce.c gdk/gdk_mapreduce.h Modified Files: gdk/Makefile.ag monetdb5/modules/mal/groups.c Branch: default Log Message:
Low-level task scheduler. This module provide a lightweight map-reduce scheduler for multicore systems. A limited number of workers are initialized upfront, which take the tasks from a central queue. The header of these task descriptors should comply with the MRtask structure.
diffs (239 lines):
diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag --- a/gdk/Makefile.ag +++ b/gdk/Makefile.ag @@ -36,7 +36,7 @@ lib_gdk = { gdk_private.h gdk_delta.h gdk_logger.h gdk_posix.h \ gdk_system.h gdk_tm.h gdk_storage.h \ gdk_calc.c gdk_calc.h gdk_calc_compare.h gdk_calc_private.h \
gdk_aggr.c gdk_group.c \
bat.feps bat1.feps bat2.feps \ libbat.rc LIBS = ../common/options/libmoptions \gdk_aggr.c gdk_group.c gdk_mapreduce.c gdk_mapreduce.h \
diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c new file mode 100644 --- /dev/null +++ b/gdk/gdk_mapreduce.c @@ -0,0 +1,141 @@ +/*
- The contents of this file are subject to the MonetDB Public License
- Version 1.1 (the "License"); you may not use this file except in
- compliance with the License. You may obtain a copy of the License at
- Software distributed under the License is distributed on an "AS IS"
- basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
- License for the specific language governing rights and limitations
- under the License.
- The Original Code is the MonetDB Database System.
- The Initial Developer of the Original Code is CWI.
- Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
- Copyright August 2008-2012 MonetDB B.V.
- All Rights Reserved.
- */
+/*
- (co) Martin L. Kersten
- This module provide a lightweight map-reduce scheduler for multicore systems.
- A limited number of workers are initialized upfront, which take the tasks
- from a central queue. The header of these task descriptors should comply
- with the MRtask structure.
- */
+#include "monetdb_config.h" +#include "gdk.h" +#include "gdk_mapreduce.h"
+/* each entry in the queue contains a list of tasks */ +typedef struct MRQUEUE {
- MRtask **tasks;
- int index; /* next available task */
- int size; /* number of tasks */
+} MRqueue;
+static MRqueue *mrqueue; +static int mrqsize= -1; /* size of queue */ +static int mrqlast= -1; +static MT_Lock mrqlock; /* its a shared resource, ie we need locks */ +static MT_Sema mrqsema; /* threads wait on empty queues */
+static void MRworker(void *);
+static void +MRqueueCreate(int sz) +{
- int i;
- MT_Id tid;
- MT_lock_init(&mrqlock, "q_create");
- MT_lock_set(&mrqlock,"q_create");
- MT_sema_init(&mrqsema, 0, "q_create");
- sz = ((sz << 1) >> 1); /* we want a multiple of 2 */
- mrqueue = (MRqueue*)GDKzalloc(sizeof(MRqueue) *sz);
- assert(mrqueue);
- mrqsize = sz;
- mrqlast = 0;
- /* create a worker thread for each core as specified as system parameter*/
- for ( i =0; i < GDKnr_threads; i++)
MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE);
- MT_lock_unset(&mrqlock,"q_create");
+}
+static void +MRenqueue(int taskcnt, MRtask **tasks) +{
- assert(taskcnt > 0);
- MT_lock_set(&mrqlock, "mrqlock");
- if (mrqlast == mrqsize) {
mrqsize <<= 1;
mrqueue = (MRqueue*) GDKrealloc(mrqueue, sizeof(MRqueue) * mrqsize);
- }
- mrqueue[mrqlast].index = 0;
- mrqueue[mrqlast].tasks = tasks;
- mrqueue[mrqlast].size = taskcnt;
- mrqlast++;
- MT_lock_unset(&mrqlock, "mrqlock");
- /* a task list is added for consumption*/
- while (taskcnt-- > 0)
MT_sema_up(&mrqsema, "mrqsema");
+}
+static MRtask * +MRdequeue(void) +{
- MRtask *r = NULL;
- int idx;
- MT_sema_down(&mrqsema, "mrqsema");
- assert(mrqlast);
- MT_lock_set(&mrqlock, "mrqlock");
- if (mrqlast > 0) {
idx = mrqueue[mrqlast-1].index;
r = mrqueue[mrqlast-1].tasks[idx++];
if ( mrqueue[mrqlast-1].size == idx)
mrqlast--;
else
mrqueue[mrqlast-1].index = idx;
- }
- MT_lock_unset(&mrqlock, "mrqlock");
- assert(r);
- return r;
+}
+static void +MRworker(void * arg) +{
- MRtask *task;
- (void) arg;
- do{
task= MRdequeue();
(task->cmd)(task);
MT_sema_up(task->sema, "mrqsema");
- } while (1);
+}
+/* schedule the tasks and return when all are done */ +void +MRschedule(int taskcnt, void **arg, void (*cmd)(void*p)) +{
- int i;
- MT_Sema sema;
- MRtask **task = (MRtask**) arg;
- if ( mrqueue == 0)
MRqueueCreate(1024);
- MT_sema_init(&sema, 0, "q_create");
- for ( i= 0; i < taskcnt; i++){
task[i]->sema = & sema;
task[i]->cmd = cmd;
- }
- MRenqueue(taskcnt,task);
- /* waiting for all report result */
- for ( i= 0; i < taskcnt; i++)
MT_sema_down(&sema, "mrqsema");
+} diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h new file mode 100644 --- /dev/null +++ b/gdk/gdk_mapreduce.h @@ -0,0 +1,32 @@ +/*
- The contents of this file are subject to the MonetDB Public License
- Version 1.1 (the "License"); you may not use this file except in
- compliance with the License. You may obtain a copy of the License at
- Software distributed under the License is distributed on an "AS IS"
- basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
- License for the specific language governing rights and limitations
- under the License.
- The Original Code is the MonetDB Database System.
- The Initial Developer of the Original Code is CWI.
- Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
- Copyright August 2008-2012 MonetDB B.V.
- All Rights Reserved.
- */
+#ifndef _GDK_MAPREDUCE_H_ +#define _GDK_MAPREDUCE_H_
+#include <monet_options.h>
+typedef struct{
- MT_Sema *sema; /* micro scheduler handle */
- void (*cmd)(void *); /* the function to be executed */
+}MRtask;
+gdk_export void MRschedule(int taskcnt, void **arg, void (*cmd)(void *p));
+#endif /* _GDK_MAPREDUCE_H_ */ diff --git a/monetdb5/modules/mal/groups.c b/monetdb5/modules/mal/groups.c --- a/monetdb5/modules/mal/groups.c +++ b/monetdb5/modules/mal/groups.c @@ -66,11 +66,15 @@ GRPmulticolumngroup(Client cntxt, MalBlk /* sort order may have influences */ /* SF100 Q16 showed < ordering is 2 times faster as > ordering */ for ( i = 3; i< pci->argc; i++)
- for ( j = i+1; j<pci->argc; j++)
- if ( sizes[j] < sizes[i]){
l = sizes[j]; sizes[j]= sizes[i]; sizes[i]= l;
bi = bid[j]; bid[j]= bid[i]; bid[i]= bi;
- }
for ( j = i+1; j<pci->argc; j++)
if ( sizes[j] < sizes[i]){
l = sizes[j];
sizes[j]= sizes[i];
sizes[i]= l;
bi = bid[j];
bid[j]= bid[i];
bid[i]= bi;
/* for (i=2; i<pci->argc; i++) mnstr_printf(cntxt->fdout,"# after [%d] "LLFMT"\n",i, sizes[i]); */}
@@ -82,8 +86,6 @@ GRPmulticolumngroup(Client cntxt, MalBlk i = 4; if (msg == MAL_SUCCEED && pci->argc > 4 ) do {
if (*ext)
/* early break when there are as many groups as histogram entries */ b = BATdescriptor(*hist); if ( b ){BBPdecref(*ext, TRUE);
@@ -91,8 +93,8 @@ GRPmulticolumngroup(Client cntxt, MalBlk BBPreleaseref(*hist); if ( j) break; }
if (*hist)
BBPdecref(*hist, TRUE);
BBPdecref(*ext, TRUE);
/* (grp,ext,hist) := group.subgroupdone(arg,grp) */ oldgrp= *grp;BBPdecref(*hist, TRUE);
checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
participants (1)
-
Sjoerd Mullender