changeset 18:e0a1f92fcbc9

FIX: locking up due to multithreaded access during garbage collection.
author samer
date Mon, 06 Feb 2012 14:25:05 +0000
parents 084a1e7377a7
children 256016cddcba
files README cpp/plml.cpp prolog/plml.pl stdio_catcher
diffstat 4 files changed, 98 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/README	Sun Feb 05 18:20:54 2012 +0000
+++ b/README	Mon Feb 06 14:25:05 2012 +0000
@@ -302,6 +302,16 @@
 	other changes: removed ml_debug/1 - would rather have a flag to
 	enable printfs in the C++ source.
 
+2012-02-06
+	Yes, finally, got to the bottom of the locking up bug.
+	The problem was mutlithreaded access to the Matlab engine even when
+	making explicit calls only in one thread IF this is a secondary thread.
+	The reason? Garbage collection in the main thread. Have now added
+	a mutex to protect the Matlab engine (currently one global mutex).
+	If garbage collector is run on a workspace variable blob while the 
+	mutex is locked, it fails immediately without blocking. The blob
+	is not reclaimed but will be can be reclaimed the next time gc is run.
+
 -------------------------------------------------------------------------
 ACKNOWLEDGMENTS
 
--- a/cpp/plml.cpp	Sun Feb 05 18:20:54 2012 +0000
+++ b/cpp/plml.cpp	Mon Feb 06 14:25:05 2012 +0000
@@ -80,6 +80,7 @@
 #include <SWI-cpp.h>
 #include <stdio.h>
 #include <unistd.h>
+#include <pthread.h>
 #include "engine.h"
 
 
@@ -173,11 +174,12 @@
 /* MATLAB engine wrapper class */
 class eng {
 public:
+  const char *magic;
   Engine *ep;   // MATLAB API engine pointer
   atom_t id;    // atom associated with this engine
   char *outbuf; // buffer for textual output from MATLAB
         
-  eng(): ep(NULL), id(PL_new_atom("")), outbuf(NULL) {}
+  eng(): ep(NULL), id(PL_new_atom("")), outbuf(NULL) { magic="mleng"; }
         
   void open(const char *cmd, atom_t id) {
     ep=engOpen(cmd);
@@ -207,6 +209,17 @@
 static eng engines[MAXENGINES]; 
 // functor to be used to wrap array pointers
 
+static pthread_mutex_t EngMutex;
+#define LOCK pthread_mutex_lock(&EngMutex)
+#define UNLOCK pthread_mutex_unlock(&EngMutex)
+
+class lock {
+public:
+	lock() { pthread_mutex_lock(&EngMutex); }
+	~lock() { pthread_mutex_unlock(&EngMutex); }
+};
+
+
 extern "C" {
 // Functions for mx array atom type
   int mx_release(atom_t a);
@@ -302,6 +315,7 @@
   ws_blob.write   = 0; 
 
   mlerror=PL_new_functor(PL_new_atom("mlerror"),3);
+  pthread_mutex_init(&EngMutex,NULL);
 }
 
 void check(int rc) { if (!rc) printf("*** plml: Something failed.\n");}
@@ -405,15 +419,25 @@
 
 int ws_release(atom_t a) {
   struct wsvar *x=atom_to_wsvar(a);
+  int rc;
   // printf("."); fflush(stdout); // sweet brevity 
   
   char buf[16];
   sprintf(buf,"clear %s",x->name);
-  engEvalString(x->engine,buf);
-  x->name[0]=0;
-  x->engine=0;
+  if (pthread_mutex_trylock(&EngMutex)==0) {
+     rc=engEvalString(x->engine,buf) ? FALSE : TRUE; 
+	  pthread_mutex_unlock(&EngMutex);
+	} else {
+	//	printf("\n *** cannot release %s while engine locked ***\n",x->name);
+		rc=FALSE;
+	}
+
+	if (rc) {
+	  x->name[0]=0;
+	  x->engine=0;
+  } 
   
-  return TRUE;
+  return rc;
 }
 
 /* see mx_write */
@@ -482,19 +506,16 @@
 }
 
 
-static int raise_exception(const char *msg) {
+static int raise_exception(const char *msg, const char *loc, const char *info) {
   // printf("\n!! raising exception: %s\n",msg);
   // return FALSE;
  
   term_t ex = PL_new_term_ref();
-  int rc;
+  return PL_unify_term(ex, PL_FUNCTOR_CHARS, "error", 2,
+			 PL_FUNCTOR_CHARS, "plml_error", 3, PL_CHARS, msg, PL_CHARS, loc, PL_CHARS, info,
+			 PL_VARIABLE)
 
-  rc = PL_unify_term(ex, PL_FUNCTOR_CHARS, "error", 2,
-			 PL_FUNCTOR_CHARS, "plml_error", 1,
-			 PL_CHARS, msg,
-			 PL_VARIABLE);
-
-  return PL_raise_exception(ex);
+		  && PL_raise_exception(ex);
 }
 
 /*
@@ -511,7 +532,7 @@
   // that the name has not been used in the workspace
   class eng *engine;
   try { engine=findEngine(eng); }
-  catch (PlException ex) { return ex.plThrow(); }
+  catch (PlException &ex) { return ex.plThrow(); }
 
   //printf("-- Entering mlWSALLOC         \r"); fflush(stdout); 
   struct wsvar x;
@@ -521,18 +542,19 @@
 
 #ifdef ALT_WSALLOC
   // printf("-- mlWSAlloc: Calling uniquevar...       \r"); fflush(stdout); 
-  if (engEvalString(x.engine, "uniquevar([])")) {
-	  return raise_exception("mlWSAlloc: Cannot execute uniquevar");
+  {  lock l;
+	  if (engEvalString(x.engine, "uniquevar([])")) 
+		  return raise_exception("eval_failed","uniquevar","none");
   }
  
   if (strncmp(engine->outbuf,">> \nans =\n\nt_",13)!=0) {
      //printf("\n** mlWSAlloc: output buffer looks bad: '%s'\n",engine->outbuf);
-	  return raise_exception("mlWSAlloc: Bad output buffer after uniquevar.");
+	  return raise_exception("bad_output_buffer","uniquevar",engine->outbuf);
   }
 
 	unsigned int len=strlen(engine->outbuf+11)-2;
 	if (len+1>sizeof(x.name)) {
-	  return raise_exception("mlWSAlloc: uniquevar name is too long.");
+	  return raise_exception("name_too_long","uniquevar",engine->outbuf);
 	 }
 	memcpy(x.name,engine->outbuf+11,len);
 	x.name[len]=0;
@@ -580,23 +602,32 @@
 foreign_t mlWSGet(term_t var, term_t val) {
   try { 
     struct wsvar *x = term_to_wsvar(var);
+	 lock l;
+	 // class eng *engine=findEngine(PlTerm(PlAtom(x->id)));
+	 // char *before=strdup(engine->outbuf);
+	 //printf("-- mlWSGET: calling get variable...\n");
     mxArray *p = engGetVariable(x->engine, x->name);
+	 //printf("-- mlWSGET: returned from get variable.\n");
 	 if (p) return PL_unify_blob(val, (void **)&p, sizeof(p), &mx_blob);
 	 else {
-		 printf("\n!! mlWSGet: failed to get %s.\n",x->name);
-		 return raise_exception("mlWSGet: failed to get variable.");
+		 //printf("\n!! mlWSGet: failed to get %s.\n",x->name);
+		 //printf("\n!! mlWSGet: before buffer: %s.\n",before);
+		 //printf("\n!! mlWSGet: before after: %s.\n",engine->outbuf);
+		 //return raise_exception("get_variable_failed",before,engine->outbuf);
+		 return raise_exception("get_variable_failed","mlWSGET",x->name);
 	 }
   } catch (PlException &e) { 
     return e.plThrow(); 
   }
 }
 
+
 // Put an array back in Matlab workspace under given variable name
 foreign_t mlWSPut(term_t var, term_t val) {
   try { 
     struct wsvar *x=term_to_wsvar(var);
-    engPutVariable(x->engine, x->name, term_to_mx(val));
-    PL_succeed;
+	 lock   l;
+    return engPutVariable(x->engine, x->name, term_to_mx(val)) ? FALSE : TRUE;
   } catch (PlException &e) { 
     return e.plThrow(); 
   }
@@ -615,6 +646,7 @@
 	 const char *cmdstr=PlTerm(cmd);
 	 int	cmdlen=strlen(cmdstr);
 	 int	rc;
+	 lock l;
     
     // if string is very long, send it via local mxArray
     if (cmdlen>MAXCMDLEN) {
@@ -630,7 +662,7 @@
 	 	 if (eval_cmd==NULL) throw PlException("Failed to allocate memory for command");
 	 	 sprintf(eval_cmd, EVALFMT, cmdstr);
 	 	 //printf("-- Calling Matlab engine...                 \r"); fflush(stdout);
-	 	 rc=engEvalString(eng->ep,eval_cmd);
+		 rc=engEvalString(eng->ep,eval_cmd); 
 	 	 //printf("-- Returned from Matlab engine...            \r"); fflush(stdout);
 	 	 delete [] eval_cmd;
 	}
@@ -642,8 +674,7 @@
 	 // something is terribly wrong and we must throw an exeption to avoid
 	 // locking up in triserver.
     if (strncmp(eng->outbuf,">> #\n",5)!=0) {
-		 //printf("\n** mlExec: Output buffer looks bad: '%s'.\n",eng->outbuf);
-		 throw PlException("mlExec: output buffer looks bad.");
+		 throw PlException(PlCompound("bad_output_buffer",PlTermv("exec",eng->outbuf)));
 	 }
 	 // write whatever is in the output buffer now, starting after the "#\n"
     fputs(eng->outbuf+5,stdout);
@@ -658,8 +689,7 @@
 	 rc=engEvalString(eng->ep,"lasterr");
 	 if (rc) { throw PlException("mlExec: unable to execute lasterr"); }
 	 if (strncmp(eng->outbuf,">> \nans =",9)!=0) {
-		 //printf("\n** mlExec: Bad output buffer post lasterr: '%s'.\n",eng->outbuf);
-		 throw PlException("mlExec: bad output buffer post lasterr");
+		 throw PlException(PlCompound("bad_output_buffer",PlTermv("lasterr",eng->outbuf)));
 	 }
 
 	 if (strncmp(eng->outbuf+11,"     ''",7)!=0) {
--- a/prolog/plml.pl	Sun Feb 05 18:20:54 2012 +0000
+++ b/prolog/plml.pl	Mon Feb 06 14:25:05 2012 +0000
@@ -298,10 +298,10 @@
 	;	Exec='ssh /usr/local/bin/matlab'
 	),
 	(	member(debug(In,Out),Options)
-	-> format(atom(Exec1),'stdio_catcher ~w ~w nohup ~w',[In,Out,Exec])
+	-> format(atom(Exec1),'stdio_catcher ~w ~w ~w',[In,Out,Exec])
 	;	Exec1=Exec
 	),
-	format(atom(Cmd),'~w ~w',[Exec,Flags]),
+	format(atom(Cmd),'~w ~w',[Exec1,Flags]),
 	mlOPEN(Cmd,Id),
 	set_flag(ml(Id),open),
 	(	member(noinit,Options) -> true
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stdio_catcher	Mon Feb 06 14:25:05 2012 +0000
@@ -0,0 +1,29 @@
+#!/bin/sh
+echo "In stdio_catcher"
+
+function handle_term {
+	echo "### $(date): received SIGTERM" >> "$outputlog"
+	exit
+}
+
+function handle_int {
+	echo "### $(date): received SIGINT" >> "$outputlog"
+}
+
+function handle_hup {
+	echo "### $(date): received SIGHUP" >> "$outputlog"
+	exit
+}
+
+trap handle_int 2
+trap handle_term 15
+trap handle_hup 1
+
+inputlog="$1"
+outputlog="$2"
+shift 2
+
+echo "### $(date): INPUT - $@" >> "$inputlog"
+echo "### $(date): OUTPUT - $@" >> "$outputlog"
+nohup tee -a "$inputlog" | "$@" | nohup tee -a "$outputlog"
+#"$@"