/*
* $AIST_Release: 4.2.4 $
* $AIST_Copyright:
* Copyright 2003, 2004, 2005, 2006 Grid Technology Research Center,
* National Institute of Advanced Industrial Science and Technology
* Copyright 2003, 2004, 2005, 2006 National Institute of Informatics
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* $
* $RCSfile: NgGrpcHandle.java,v $ $Revision: 1.98 $ $Date: 2005/10/04 06:04:15 $
*/
package org.apgrid.grpc.ng;
import java.util.List;
import java.util.Properties;
import org.apgrid.grpc.ng.info.*;
import org.apgrid.grpc.ng.protocol.ProtCancelSessionRequest;
import org.apgrid.grpc.ng.protocol.ProtExitExeRequest;
import org.apgrid.grpc.ng.protocol.ProtInvokeCallbackNotify;
import org.apgrid.grpc.ng.protocol.ProtInvokeSessionRequest;
import org.apgrid.grpc.ng.protocol.ProtPullbackSessionReply;
import org.apgrid.grpc.ng.protocol.ProtPullbackSessionRequest;
import org.apgrid.grpc.ng.protocol.ProtQueryExeStatusReply;
import org.apgrid.grpc.ng.protocol.ProtQueryExeStatusRequest;
import org.apgrid.grpc.ng.protocol.ProtQueryFunctionInfoReply;
import org.apgrid.grpc.ng.protocol.ProtQueryFunctionInfoRequest;
import org.apgrid.grpc.ng.protocol.ProtResetExeRequest;
import org.apgrid.grpc.ng.protocol.ProtResumeSessionRequest;
import org.apgrid.grpc.ng.protocol.ProtSuspendSessionRequest;
import org.apgrid.grpc.ng.protocol.ProtTransferArgumentRequest;
import org.apgrid.grpc.ng.protocol.ProtTransferCallbackArgumentReply;
import org.apgrid.grpc.ng.protocol.ProtTransferCallbackArgumentRequest;
import org.apgrid.grpc.ng.protocol.ProtTransferCallbackResultRequest;
import org.apgrid.grpc.ng.protocol.ProtTransferResultReply;
import org.apgrid.grpc.ng.protocol.ProtTransferResultRequest;
import org.apgrid.grpc.util.*;
import org.gridforum.gridrpc.GrpcException;
/**
* Provides interfaces for Grpc*Handle class.
* This class is used internally, you should use
* NgGrpcFunctionHandle or NgGrpcObjectHandle.
*
* And provides variables of status code.
* It's used in {@link NgGrpcFunctionHandle#getLocalStatus()} and
* {@link NgGrpcObjectHandle#getLocalStatus()}.
*/
public class NgGrpcHandle implements Cloneable {
private NgGrpcClient context;
private int jobID;
private int executableID;
private String hostName;
private RemoteMachineInfo remoteMachineInfo;
private RemoteClassPathInfo classPathInfo;
private String className;
private RemoteClassInfo remoteClassInfo;
private NgGrpcJob job;
private NgGrpcExecInfo execInfo;
private SessionInformation sessionInfo;
private GrpcTimer timer;
private int sessionID;
private Integer status;
private CommunicationManager commManager;
private String methodName;
private int methodID;
private RemoteMethodInfo remoteMethodInfo;
private NgLog ngLog;
private int jobCount;
private Properties propGrpcHandleAttr;
private int[] intArguments;
private boolean isCanceled;
private boolean isLocked;
private long jobStartTimeout;
/* for callback */
private List callbackFunc;
private List callbackFuncInfo;
/* protocol version */
int versionMajor;
int versionMinor;
int versionPatch;
/* status of Ninf-G Client */
/** not available */
public final int CLIENTSTATE_NONE = 0x00;
/** idling */
public final int CLIENTSTATE_IDLE = 0x01;
/** initializing handle */
public final int CLIENTSTATE_INIT = 0x02;
/** invoking session */
public final int CLIENTSTATE_INVOKE_SESSION = 0x03;
/** sending arguments */
public final int CLIENTSTATE_TRANSARG = 0x05;
/** waiting for function complete */
public final int CLIENTSTATE_WAIT = 0x07;
/** received notify of function complete */
public final int CLIENTSTATE_COMPLETE_CALCULATING = 0x08;
/** transforming results */
public final int CLIENTSTATE_TRANSRES = 0x09;
/** pulling back session */
public final int CLIENTSTATE_PULLBACK = 0x0b;
/** suspending session */
public final int CLIENTSTATE_SUSPEND = 0x0c;
/** resuming session */
public final int CLIENTSTATE_RESUME = 0x0e;
/** disposing handle */
public final int CLIENTSTATE_DISPOSE = 0x0f;
/** resetting handle */
public final int CLIENTSTATE_RESET = 0x10;
/** canceling session */
public final int CLIENTSTATE_CANCEL = 0x11;
/** executing callback */
public final int CLIENTSTATE_INVOKE_CALLBACK = 0x12;
/**
* Creates NgGrpcHandle.
*/
protected NgGrpcHandle() {
/* nothing will be done */
}
/**
* Creates NgGrpcHandle without any server information.
* Default server information(described at 1st < SERVER_INFO > section
* in a configuration file) will be used as server.
*
* @param className a name of RemoteFunction/RemoteObject.
* @param context NgGrpcClient.
* @param jobID ID of Job.
* @param executableID ID of executable.
* @param jobCount a number of Jobs.
* @throws GrpcException if failed to create handle.
*/
protected NgGrpcHandle(String className, NgGrpcClient context,
int jobID, int executableID, int jobCount) throws GrpcException {
NgInformationManager infoMgr = context.getNgInformationManager();
Properties tmpProp = null;
try {
infoMgr.lockInformationManager();
tmpProp =
getDefaultServerProperties(infoMgr.getDefaultRemoteMachineProperties(className));
} finally {
infoMgr.unlockInformationManager();
}
initialize(className, tmpProp, context, jobID, executableID, jobCount);
}
/**
* Creates NgGrpcHandle.
* Attribute variables of the server must set in prop.
*
* @param className a name of RemoteFunction/RemoteObject.
* @param prop attribute variables of the server.
* @param context NgGrpcClient.
* @param jobID ID of Job.
* @param executableID ID of executable.
* @param jobCount a number of Jobs.
* @throws GrpcException if failed to create handle.
* @see NgGrpcHandleAttr
*/
protected NgGrpcHandle(String className, Properties prop,
NgGrpcClient context, int jobID, int executableID, int jobCount)
throws GrpcException {
initialize(className, prop, context, jobID, executableID, jobCount);
}
/**
* Initialize Handle.
*
* @param className
* @param prop
* @param context
* @param jobID
* @param executableID
* @param jobCount
* @throws GrpcException
*/
private void initialize(String className, Properties prop,
NgGrpcClient context, int jobID, int executableID, int jobCount)
throws GrpcException {
this.context = context;
this.jobID = jobID;
this.executableID = executableID;
this.jobCount = jobCount;
this.execInfo = new NgGrpcExecInfo();
this.timer = new GrpcTimer();
this.intArguments = null;
this.isCanceled = false;
this.isLocked = false;
/* init status */
this.status = new Integer(CLIENTSTATE_NONE);
/* get name of server */
if (prop == null) {
throw new NgInitializeGrpcHandleException(
"Invalid ServerInformation.");
}
this.hostName = (String) prop.get(NgGrpcHandleAttr.KEY_HOSTNAME);
/* If hostname is not specified, use default server */
if (this.hostName == null) {
NgInformationManager infoMgr = context.getNgInformationManager();
Properties tmpProp = null;
try {
infoMgr.lockInformationManager();
tmpProp = infoMgr.getDefaultRemoteMachineProperties(className);
} finally {
infoMgr.unlockInformationManager();
}
if (tmpProp == null) {
throw new NgInitializeGrpcHandleException(
"Server is not specified.");
}
this.hostName =
(String) tmpProp.get(NgGrpcHandleAttr.KEY_HOSTNAME);
}
/* get name of class */
if (prop.get(NgGrpcHandleAttr.KEY_CLASSNAME) != null) {
this.className = (String) prop.get(NgGrpcHandleAttr.KEY_CLASSNAME);
} else {
this.className = className;
}
/* check name of host and class */
if ((hostName == null) || (className == null)) {
throw new NgInitializeGrpcHandleException(
"Invalid hostname or classname");
}
/* set attribute for GrpcHandle */
this.propGrpcHandleAttr = prop;
/* get NgLog */
this.ngLog = context.getNgLog();
/* activate GrpcHandle */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#initialize(): activate handle.");
activate();
/* set status */
setStatus("", CLIENTSTATE_IDLE);
}
/**
* Initialize NgGrpcHandle.
* Searches information of the server and RemoteFunction/RemoteObject.
* And invokes RemoteFunction/RemoteObject.
* Then starts Thread for CommunicationManager and Timer of HeartBeat.
*
* @throws GrpcException if it failed to initialize.
*/
private void activate() throws GrpcException {
/* set status */
setStatus("activate", CLIENTSTATE_INIT);
/* lookup information */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#activate(): search information.");
/* get RemoteMachineInfo */
RemoteClassPathInfo rcPath = null;
timer.start();
NgInformationManager infoManager = context.getNgInformationManager();
try {
infoManager.lockInformationManager();
rcPath = infoManager.getClassPathInfo(hostName, className);
if (rcPath == null) {
throw new NgInitializeGrpcHandleException(
"can't get RemoteClassPathInfo.");
}
remoteMachineInfo = infoManager.getRemoteMachineInfoCopy(hostName);
} finally {
infoManager.unlockInformationManager();
}
/* set time to lookup information */
execInfo.setLookupServerInfoTime(timer.getElapsedTime());
/* get RemoteClassInfo */
timer.start();
try {
infoManager.lockInformationManager();
remoteClassInfo = (RemoteClassInfo)infoManager.getRemoteClassInfo(className);
} finally {
infoManager.unlockInformationManager();
}
/* set time to lookup information */
execInfo.setLookupClassInfoTime(timer.getElapsedTime());
/* set attribute of Handle */
if (this.propGrpcHandleAttr.containsKey(NgGrpcHandleAttr.KEY_MPI_NCPUS) == true) {
remoteMachineInfo.resetNumCPUs();
this.propGrpcHandleAttr =
NgGrpcHandleAttr.convertNumOfCPUs(this.propGrpcHandleAttr);
}
remoteMachineInfo.overwriteParameter(this.propGrpcHandleAttr);
/* invoke GrpcJob */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#activate(): invoke Ninf-G Executable.");
/* set timeout */
String jobStartTimeout =
(String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_JOB_STARTTIMEOUT);
this.jobStartTimeout = Long.parseLong(jobStartTimeout) * 1000;
/* check the timeout variable */
if (this.jobStartTimeout < 0) {
throw new NgInitializeGrpcHandleException("job_startTimeout is invalid.");
}
/* set jobType */
String jobType = null;
if (remoteClassInfo != null) {
jobType = remoteClassInfo.getBackend();
}
if (rcPath != null) {
String jobTypeInClassPathInfo =
(String) rcPath.get(RemoteClassPathInfo.KEY_CLASS_PATH_BACKEND);
if (jobTypeInClassPathInfo != null &&
(jobTypeInClassPathInfo.equals(RemoteMachineInfo.VAL_BACKEND_MPI) ||
jobTypeInClassPathInfo.equals(RemoteMachineInfo.VAL_BACKEND_BLACS))) {
jobType = jobTypeInClassPathInfo;
}
}
/* invoke JOB */
timer.start();
job = new NgGrpcJob(context, remoteMachineInfo,
className, jobID, jobType, jobCount);
/* set time to invoke */
execInfo.setInvokeTime(timer.getElapsedTime());
/* start CommunicationManager, this will wait for active */
startCommunicationManager();
/* get protocol version */
this.versionMajor = commManager.getVersionMajor();
this.versionMinor = commManager.getVersionMinor();
this.versionPatch = commManager.getVersionPatch();
}
/**
* Starts CommunicationManager to manager send/receive Protocol.
*
* @throws GrpcException if failed to start CommunicationManager.
*/
protected void startCommunicationManager() throws GrpcException {
/* start CommunicationManager */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#startCommunicationManager () : start CommunicationManager.");
try {
commManager = new CommunicationManager(context, this);
} catch (GrpcException e) {
job.setRequiredJobCancel();
job.incrementExitCount();
throw e;
}
new Thread(commManager).start();
}
/**
* Cancels a current session.
*
* @throws GrpcException if failed to cancel.
*/
protected void cancel() throws GrpcException {
cancelSession();
pullbackSession();
}
/**
* Sends request of cancel and wait for the reply.
*
* @throws GrpcException if it failed to cancel.
*/
private void cancelSession() throws GrpcException {
try {
/* lock the handle */
lockHandle();
/* set cancel flag */
this.isCanceled = true;
int status = getLocalStatus();
if ((status == CLIENTSTATE_NONE) || (status == CLIENTSTATE_INIT) ||
(status == CLIENTSTATE_TRANSRES)) {
/* nothing will be done */
return;
} else if (status == CLIENTSTATE_IDLE) {
/* reset isCanceled flag */
this.isCanceled = false;
return;
}
/* send and receive Cancel Protocol */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#cancelSession(): cancel Session.");
Protocol cancelRequest = new ProtCancelSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol cancelReply =
commManager.sendProtocol(cancelRequest);
/* check result of Reply */
checkResult(cancelReply);
/* set status */
setStatus("cancelSession", CLIENTSTATE_CANCEL);
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Sends request of dispose and wait for the reply.
* And stops CommunicationManager and Timer of HeartBeat.
*
* @throws GrpcException if it failed to dispose.
*/
protected void dispose() throws GrpcException {
try {
/* lock the handle */
lockHandle();
if (getLocalStatus() != CLIENTSTATE_IDLE) {
/* invalid status */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_ERROR,
this,
"NgGrpcHandle#dispose(): invalid status.");
throw new NgFinalizeGrpcHandleException("failed to dispose");
}
/* dispose handle */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#dispose(): dispose Handle.");
Protocol exitExeRequest = new ProtExitExeRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol exitExeReply =
commManager.sendProtocol(exitExeRequest);
/* check result of Reply */
checkResult(exitExeReply);
} catch (GrpcException e) {
/* set requiredJobCancel */
job.setRequiredJobCancel();
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#dispose(): set Require JobCancel .");
} finally {
/* set status */
setStatus("dispose", CLIENTSTATE_NONE);
/* increment exit count */
job.incrementExitCount();
/* remove this from handle list */
context.removeHandle(this);
/* unlock the handle */
unlockHandle();
}
/* wait for done */
String jobStopTimeout =
(String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_JOB_STOPTIMEOUT);
long stopTimeout = Long.parseLong(jobStopTimeout) * 1000;
if (stopTimeout < 0) {
job.waitForDone();
} else if (stopTimeout == 0) {
/* don't wait JOB DONE */
} else {
if (job.waitForDone(stopTimeout) != 0) {
/* timeout */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_WARN,
this,
"NgGrpcHandle#dispose(): timeout to dispose.");
//throw new NgFinalizeGrpcHandleException("failed to dispose");
}
}
/* dispose JOB */
job.dispose();
}
/**
* Checks if this handle doesn't handle any jobs.
*
* @return true if it's idle, false otherwise.
*/
protected boolean isIdle() {
if (getLocalStatus() == CLIENTSTATE_IDLE) {
return true;
} else {
return false;
}
}
/**
* Sends request of execInfo and wait for the reply.
*
* @return status code of a RemoteExecutable.
* if Ninf-G Executable is not available, returns -1.
* @throws GrpcException if it's failed to get status code.
*/
protected int getExeInfo() throws GrpcException {
int status = getLocalStatus();
if ((status == CLIENTSTATE_NONE) || (status == CLIENTSTATE_INIT)) {
/* nothing will be done */
return -1;
}
/* get Information of Ninf-G Executable */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#getInfo(): get Information.");
Protocol queryExeStatusRequest = new ProtQueryExeStatusRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
ProtQueryExeStatusReply queryExeStatusReply =
(ProtQueryExeStatusReply)
commManager.sendProtocol(queryExeStatusRequest);
/* check result of Reply */
checkResult(queryExeStatusReply);
return queryExeStatusReply.getExeInfo();
}
/**
* Gets a status code of this handle.
*
* @return a status code of this handle.
*/
protected int getLocalStatus() {
return status.intValue();
}
/**
* Generates ID of session.
*
* @return ID of session.
*/
private int generateSessionID() {
sessionID += 1;
return sessionID;
}
/**
* Sends request of pullbackSession and wait for the reply.
*
* @throws GrpcException if it failed to pull back a session.
*/
protected void pullbackSession() throws GrpcException {
try {
/* lock the handle */
lockHandle();
int status = getLocalStatus();
if (status != CLIENTSTATE_CANCEL) {
/* invalid status */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_ERROR,
this,
"NgGrpcHandle#pullbackSession(): invalid status.");
throw new NgExecRemoteMethodException("failed to pullback");
}
/* pullback session from Ninf-G Executable */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#pullbackSession(): Pullback Session.");
Protocol pullbackSessionRequest = new ProtPullbackSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol pullbackSessionReply =
commManager.sendProtocol(pullbackSessionRequest);
/* check result of Reply */
checkResult(pullbackSessionReply);
/* set status */
setStatus("pullbackSession", CLIENTSTATE_IDLE);
/* reset cancel flag */
this.isCanceled = false;
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Sends request of resetExecutable and wait for the reply.
*
* @throws GrpcException if it failed to reset.
*/
protected void resetExecutable() throws GrpcException {
try {
/* lock the handle */
lockHandle();
int status = getLocalStatus();
if ((status == CLIENTSTATE_NONE) || (status == CLIENTSTATE_INIT) ||
(status == CLIENTSTATE_IDLE)) {
/* nothing will be done */
return;
}
/* reset Ninf-G Executable */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#resetExecutable(): Reset Session.");
Protocol resetExeRequest = new ProtResetExeRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol resetExeReply =
commManager.sendProtocol(resetExeRequest);
/* check result of Reply */
checkResult(resetExeReply);
/* set status */
setStatus("resetExecutable", CLIENTSTATE_IDLE);
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Sends request of resetExecutable and wait for the reply.
*
* @throws GrpcException if it failed to resume.
*/
protected void resumeSession() throws GrpcException {
try {
/* lock the handle */
lockHandle();
int status = getLocalStatus();
if ((status != CLIENTSTATE_WAIT) && (status != CLIENTSTATE_SUSPEND)) {
/* invalid status */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_ERROR,
this,
"NgGrpcHandle#resumeSession(): invalid status.");
throw new NgExecRemoteMethodException("failed to resume");
}
/* resume Ninf-G Executable */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#resumeSession(): Resume Session.");
Protocol resumeSessionRequest = new ProtResumeSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol resumeSessionReply =
commManager.sendProtocol(resumeSessionRequest);
/* check result of Reply */
checkResult(resumeSessionReply);
/* set status */
setStatus("resumeSession", CLIENTSTATE_WAIT);
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Starts a session with arguments.
*
* @param args arguments for a RemoteFunction/RemoteMethod.
* @return {@link org.gridforum.gridrpc.GrpcExecInfo}
* @throws GrpcException if it failed to complete a session.
*/
protected NgGrpcExecInfo startSession(
Properties sessionAttr, List args) throws GrpcException {
return startSession((String)null, sessionAttr, args);
}
/**
* Starts a session with a name of method and arguments.
* If method name is not supplied, then call default method.
*
* @param methodName a name of a method.
* @param args arguments of RemoteFunction/RemoteMethod.
* @return {@link org.gridforum.gridrpc.GrpcExecInfo}
* @throws GrpcException if it failed to complete a session.
*/
protected NgGrpcExecInfo startSession(
String methodName, Properties sessionAttr, List args) throws GrpcException {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_DEBUG,
this,
"NgGrpcHandle#startSession()");
/* set methodName */
this.methodName = methodName;
/* if there is no class information, do search */
if (remoteClassInfo == null) {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#startSession() : get name of Method.");
NgInformationManager infoMgr = context.getNgInformationManager();
try {
infoMgr.lockInformationManager();
if (methodName == null) {
remoteMethodInfo = infoMgr.getRemoteMethodInfo(className);
} else {
remoteMethodInfo =
infoMgr.getRemoteMethodInfo(className, methodName);
}
/* Maybe you can get RemoteClassInfo from RemoteExecutable */
remoteClassInfo = infoMgr.getRemoteClassInfo(className);
} catch (GrpcException e) {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#startSession() : can't get MethodInfo from InformationManager.");
} finally {
infoMgr.unlockInformationManager();
}
} else {
remoteMethodInfo = remoteClassInfo.getRemoteMethodInfo(methodName);
}
/* if failed to get RemoteClassInfo, get it from RemoteExecutable */
if (remoteMethodInfo == null) {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#startSession() : get RemoteClassInfo from Executable.");
/* get RemoteClassInfo from RemoteExecutable */
remoteClassInfo = getRemoteClassInfo();
if (methodName == null) {
remoteMethodInfo =
context.getNgInformationManager().getRemoteMethodInfo(
className);
} else {
remoteMethodInfo =
context.getNgInformationManager().getRemoteMethodInfo(
className, methodName);
}
}
/* Do calculation */
return doSession(sessionAttr, args);
}
/**
* Starts a session(Invokes a RemoteFunction/RemoteMethod, Waits for complete it).
*
* @param args arguments of RemoteFunction/RemoteMethod.
* @return {@link org.gridforum.gridrpc.GrpcExecInfo}
* @throws GrpcException if it failed to complete a session.
*/
private NgGrpcExecInfo doSession(Properties sessionAttr, List args) throws GrpcException {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_DEBUG,
this,
"NgGrpcHandle#doSession()");
/* set timeout SessionCancel */
String sessionTimeout = null;
if ((sessionAttr != null) &&
(sessionAttr.get(NgGrpcSessionAttr.KEY_SESSION_TIMEOUT) != null)) {
sessionTimeout =
(String)sessionAttr.get(NgGrpcSessionAttr.KEY_SESSION_TIMEOUT);
} else {
sessionTimeout =
(String) remoteMachineInfo.getRemoteClassPath(
className).get(RemoteClassPathInfo.KEY_CLASS_PATH_SESSION_TIMEOUT);
}
if (sessionTimeout != null) {
commManager.setSessionTimeout(Integer.parseInt(sessionTimeout));
}
/* reset intArguments */
this.intArguments = null;
/* generate sessionID */
int sessionID = generateSessionID();
/* set methodID */
this.methodID = remoteClassInfo.getRemoteMethodID(methodName);
/* ----- Invoke Session ----- */
try {
/* lock the handle */
lockHandle();
if ((this.isCanceled == true) || (getLocalStatus() != CLIENTSTATE_IDLE)) {
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): unexpected status, can't continue session");
this.isCanceled = false;
return null;
}
/* reset Cond of Complete Function */
commManager.resetCompleteFunction();
/* check timeout of session */
commManager.checkSessionTimeout();
/* Invoke Session */
Protocol invokeSessionRequest = new ProtInvokeSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID, methodID,
versionMajor, versionMinor, versionPatch);
Protocol invokeSessionReply =
commManager.sendProtocol(invokeSessionRequest);
/* check result of Reply */
checkResult(invokeSessionReply);
setStatus("doSession", CLIENTSTATE_INVOKE_SESSION);
} finally {
/* unlock the handle */
unlockHandle();
}
/* transform arg */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): translate argument data.");
CallContext callContext = new CallContext(remoteMethodInfo, args);
/* set information about callback */
callbackFunc = callContext.getCallbackFuncList();
callbackFuncInfo = callContext.getCallbackFuncInfoList();
this.intArguments = callContext.getIntArguments();
/* create SessionInformation */
this.sessionInfo =
new SessionInformation(callContext.getNumParams(), callbackFunc.size());
/* send Arguments */
boolean compressEnable = remoteMachineInfo.get(
RemoteMachineInfo.KEY_COMPRESS).equals(
RemoteMachineInfo.VAL_COMPRESS_ZLIB);
int compressThreshold =
new Integer((String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_COMPRESS_THRESHOLD)).intValue();
int blockSize =
new Integer((String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_BLOCK_SIZE)).intValue();
/* ----- Transform Arguments ----- */
try {
/* lock the handle */
lockHandle();
if (getLocalStatus() != CLIENTSTATE_INVOKE_SESSION) {
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): unexpected status, can't continue session");
return null;
}
/* check timeout of session */
commManager.checkSessionTimeout();
/* Transform arguments */
timer.start();
Protocol transferArgumentRequest = new ProtTransferArgumentRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID, callContext,
commManager.getEncodeTypes(), compressEnable,
compressThreshold, blockSize,
versionMajor, versionMinor, versionPatch);
Protocol transferArgumentReply =
commManager.sendProtocol(transferArgumentRequest);
this.sessionInfo.setTransferArgumentRealTime(timer.getElapsedTime());
/* check result of Reply */
checkResult(transferArgumentReply);
/* put information about compress */
if (compressEnable == true) {
ProtTransferArgumentRequest protTransArg =
(ProtTransferArgumentRequest)transferArgumentRequest;
if (protTransArg.getOriginalDataTotalLength() != 0) {
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): Before compress Data = " +
protTransArg.getOriginalDataTotalLength() + " Bytes, " +
"After compress Data = " +
protTransArg.getConvertedDataTotalLength() + " Bytes, " +
"The rate of compression = " +
(((double)protTransArg.getConvertedDataTotalLength() / (double)protTransArg.getOriginalDataTotalLength()) * 100.0) +
"%.");
}
/* set CompressInformation */
this.sessionInfo.setCompressionInformation(
protTransArg.getOriginalDataLength(),
protTransArg.getConvertedDataLength(),
protTransArg.getConvertRealTime(),
protTransArg.getConvertCPUTime());
}
/* set status */
setStatus("doSession", CLIENTSTATE_WAIT);
} finally {
/* unlock the handle */
unlockHandle();
}
/* check timeout of session */
commManager.checkSessionTimeout();
/* wait for the end of calculation */
timer.start();
commManager.waitCompleteFunction();
this.sessionInfo.setCalculationRealTime(timer.getElapsedTime());
/* ----- Transform Result ----- */
Protocol transferResultReply = null;
try {
/* lock the handle */
lockHandle();
if (getLocalStatus() != CLIENTSTATE_WAIT) {
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): unexpected status, can't continue session");
return null;
}
/* receive Results */
timer.start();
Protocol transferResultRequest = new ProtTransferResultRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch, callContext);
transferResultReply = commManager.sendProtocol(transferResultRequest);
this.sessionInfo.setTransferResultRealTime(timer.getElapsedTime());
/* check result of Reply */
checkResult(transferResultReply);
/* put information about compress */
if (compressEnable == true) {
ProtTransferResultReply protTransRes =
(ProtTransferResultReply)transferResultReply;
if (protTransRes.getOriginalDataTotalLength() != 0) {
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): Before compress Data = " +
protTransRes.getOriginalDataTotalLength() + " Bytes, " +
"After compress Data = " +
protTransRes.getConvertedDataTotalLength() + " Bytes, " +
"The rate of compression = " +
(((double)protTransRes.getConvertedDataTotalLength() / (double)protTransRes.getOriginalDataTotalLength()) * 100.0) +
"%.");
}
/* set CompressInformation */
this.sessionInfo.setDecompressionInformation(
protTransRes.getOriginalDataLength(),
protTransRes.getConvertedDataLength(),
protTransRes.getConvertRealTime(),
protTransRes.getConvertCPUTime());
}
/* set status */
setStatus("doSession", CLIENTSTATE_TRANSRES);
} finally {
/* unlock the handle */
unlockHandle();
}
/* set result data into CallContext */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): transform result data.");
ProtTransferResultReply resultReply =
(ProtTransferResultReply)transferResultReply;
callContext.setResultData(resultReply.getResultData());
/* translate Results */
callContext.transformResult(args);
/* ----- Pullback Session ----- */
try {
/* lock the handle */
lockHandle();
if ((isCanceled == true) || (getLocalStatus() != CLIENTSTATE_TRANSRES)) {
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): unexpected status, can't continue session");
return null;
}
Protocol pullbackSessionRequest = new ProtPullbackSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol pullbackSessionReply =
commManager.sendProtocol(pullbackSessionRequest);
/* check result of Reply */
checkResult(pullbackSessionReply);
/* get and set SessionInfo of server */
ProtPullbackSessionReply protPullBack =
(ProtPullbackSessionReply) pullbackSessionReply;
execInfo.setServerSessionInformation(protPullBack.getServerSessionInfo());
/* put received SessionInformation */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#doSession(): Session Information as below.\n" +
protPullBack.getServerSessionInformationString());
/* set SessionInfo of client */
execInfo.setClientSessionInformation(this.sessionInfo);
/* set status */
setStatus("doSession", CLIENTSTATE_IDLE);
/* reset cancel flag */
this.isCanceled = false;
} finally {
/* unlock the handle */
unlockHandle();
}
/* release callbackInfo */
callbackFunc = null;
callbackFuncInfo = null;
return execInfo;
}
/**
* Sends request of suspendSession and wait for the reply.
*
* @throws GrpcException if it failed to suspend.
*/
protected void suspendSession() throws GrpcException {
try {
/* lock the handle */
lockHandle();
int status = getLocalStatus();
if ((status != CLIENTSTATE_WAIT) && (status != CLIENTSTATE_SUSPEND)) {
/* invalid status */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_ERROR,
this,
"NgGrpcHandle#suspendSession(): invalid status.");
throw new NgExecRemoteMethodException("failed to suspend");
}
/* suspend Ninf-G Executable */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#suspendSession(): suspend Session.");
Protocol suspendSessionRequest = new ProtSuspendSessionRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
Protocol suspendSessionReply =
commManager.sendProtocol(suspendSessionRequest);
/* check result of Reply */
checkResult(suspendSessionReply);
/* set status */
setStatus("suspendSession", CLIENTSTATE_SUSPEND);
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Gets ID of associated with this handle.
*
* @return ID of the executable.
*/
protected int getExecutableID() {
return executableID;
}
/**
* Sets ID of associated with this handle.
*
* @param executableID ID to set as executableID.
*/
protected void setExecutableID(int executableID) {
this.executableID = executableID;
}
/**
* Sends request of invokeCallback and wait for the reply.
*
* @param prot received Protocol(Notify of callback).
* @throws GrpcException if it failed to execute callback.
*/
protected void invokeCallback(Protocol prot) throws GrpcException {
try {
/* lock the handle */
lockHandle();
if ((isCanceled == true) || (getLocalStatus() != CLIENTSTATE_WAIT)) {
/* invalid status */
ngLog.printSessionLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_ERROR,
this,
"NgGrpcHandle#invokeCallback(): invalid status.");
throw new NgExecRemoteMethodException("failed to invoke Callback");
}
/* set status */
setStatus("invokeCallback", CLIENTSTATE_INVOKE_CALLBACK);
/* get ID of callback */
ProtInvokeCallbackNotify protInvokeCallback =
(ProtInvokeCallbackNotify) prot;
int id = protInvokeCallback.getID();
int seq = protInvokeCallback.sequence;
/* count callback */
this.sessionInfo.incrementCallbackNTimesCalled();
/* ----- Transform arguments of Ninf-G callback ----- */
/* Request arguments for callback */
timer.start();
ProtTransferCallbackArgumentRequest cbTransferArgumentRequest =
new ProtTransferCallbackArgumentRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID, id, seq,
versionMajor, versionMinor, versionPatch);
/* Receive arguments for callback */
Protocol cbTransferArgumentReply =
commManager.sendProtocol(cbTransferArgumentRequest);
this.sessionInfo.setCallbackTransferArgumentRealTime(timer.getElapsedTime());
/* check result of Reply */
checkResult(cbTransferArgumentReply);
/* create CallContext for callback */
ProtTransferCallbackArgumentReply callbackTransferArgumentReply =
(ProtTransferCallbackArgumentReply)cbTransferArgumentReply;
CallContext callContext = new CallContext(
(RemoteMethodInfo) callbackFuncInfo.get(id),
callbackTransferArgumentReply.getArgumentData(), this.intArguments);
/* get information about Compress */
boolean compressEnable =
new Boolean((String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_COMPRESS)).booleanValue();
/* set CompressInformation */
/*
* not implement on 2.4.0
if (compressEnable == true) {
this.sessionInfo.setCallbackInflateInformation(id,
callbackTransferArgumentReply.getOriginalDataLength(),
callbackTransferArgumentReply.getConvertedDataLength(),
callbackTransferArgumentReply.getConvertRealTime(),
callbackTransferArgumentReply.getConvertCPUTime());
}
*/
/* ----- call Ninf-G callback function ----- */
/* get callback Object */
NgCallbackInterface callback =
(NgCallbackInterface) callbackFunc.get(id);
/* call callback function */
timer.start();
callback.callback(callContext.getArgs());
this.sessionInfo.setCallbackCalculationRealTime(timer.getElapsedTime());
/* ----- Transform results of Ninf-G callback ----- */
/* transform Result */
callContext.transformCBResult();
/* Send result for callback */
int compressThreshold =
new Integer((String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_COMPRESS_THRESHOLD)).intValue();
int blockSize =
new Integer((String) remoteMachineInfo.get(
RemoteMachineInfo.KEY_BLOCK_SIZE)).intValue();
ProtTransferCallbackResultRequest cbTransferResultRequest =
new ProtTransferCallbackResultRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID, id, seq, callContext,
commManager.getEncodeTypes(), compressEnable, compressThreshold,
blockSize, versionMajor, versionMinor, versionPatch);
/* Receive reply for callback */
timer.start();
Protocol cbTransferResultReply =
commManager.sendProtocol(cbTransferResultRequest);
this.sessionInfo.setCallbackTransferResultRealTime(timer.getElapsedTime());
/* check result of Reply */
checkResult(cbTransferResultReply);
/* set CompressInformation */
/*
* not implement on 2.4.0
if (compressEnable == true) {
this.sessionInfo.setCallbackDeflateInformation(id,
cbTransferResultRequest.getOriginalDataLength(),
cbTransferResultRequest.getConvertedDataLength(),
cbTransferResultRequest.getConvertRealTime(),
cbTransferResultRequest.getConvertCPUTime());
}
*/
/* set status */
setStatus("invokeCallback", CLIENTSTATE_WAIT);
} finally {
/* unlock the handle */
unlockHandle();
}
}
/**
* Sends request of queryFunctionInfo and wait for the reply.
*
* @return information of a RemoteFunction/RemoteMethod.
* @throws GrpcException if it failed to execute callback.
*/
private RemoteClassInfo getRemoteClassInfo() throws GrpcException {
int status = getLocalStatus();
if ((status == CLIENTSTATE_NONE) || (status == CLIENTSTATE_INIT)) {
/* nothing will be done */
return null;
}
/* get Information of Ninf-G Executable */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#getRemoteClassInfo(): get RemoteClassInfo.");
/* send queryFunctionInfoRequest to Executable */
Protocol queryFunctionInfoRequest = new ProtQueryFunctionInfoRequest(
commManager.generateSequenceNum(), context.getID(),
executableID, sessionID,
versionMajor, versionMinor, versionPatch);
/* receive queryFunctionInfoReply from Executable */
ProtQueryFunctionInfoReply queryFunctionInfoReply =
(ProtQueryFunctionInfoReply)
commManager.sendProtocol(queryFunctionInfoRequest);
/* check result of Reply */
checkResult(queryFunctionInfoReply);
/* put RemoteClassInfo into NgInformationManager */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#getRemoteClassInfo(): put RemoteClassInfo to InformationManager.");
NgInformationManager infoManager = context.getNgInformationManager();
try {
infoManager.lockInformationManager();
infoManager.putRemoteClassInfo(className,
queryFunctionInfoReply.getRemoteClassInfo());
} finally {
infoManager.unlockInformationManager();
}
/* return information */
return queryFunctionInfoReply.getRemoteClassInfo();
}
/**
* Gets RemoteMachineInfo.
*
* @return RemoteMachineInfo of this handle.
*/
protected RemoteMachineInfo getRemoteMachineInfo() {
return remoteMachineInfo;
}
/**
* Checks result code in Protocol object.
*
* @param protocol target Protocol.
* @throws GrpcException if result code is not 0 or received protocol is null.
*/
private void checkResult(Protocol protocol) throws GrpcException {
/* check if protocol were null */
if (protocol == null) {
throw new NgException("protocol is null.");
} else if (protocol.result != 0) {
throw new NgException("protocol has invalid result code.");
}
}
/* (non-Javadoc)
* @see java.lang.Object#clone()
*/
protected Object clone() throws CloneNotSupportedException {
return super.clone();
}
/**
* Sets status code of client.
* If you set loglevel greater than 4, puts log message.
*
* @param methodName a name of method(for log message).
* @param status status code to set.
*/
private void setStatus(String methodName, int status) {
/* get String for status */
String statString = null;
switch (status) {
case CLIENTSTATE_NONE:
statString = "NONE";
break;
case CLIENTSTATE_IDLE:
statString = "IDLE";
break;
case CLIENTSTATE_INIT:
statString = "INIT";
break;
case CLIENTSTATE_INVOKE_SESSION:
statString = "INVOKE_SESSION";
break;
case CLIENTSTATE_TRANSARG:
statString = "TRANSARG";
break;
case CLIENTSTATE_WAIT:
statString = "WAIT";
break;
case CLIENTSTATE_COMPLETE_CALCULATING:
statString = "COMPLETE_CALCULATING";
break;
case CLIENTSTATE_TRANSRES:
statString = "TRANSRES";
break;
case CLIENTSTATE_PULLBACK:
statString = "PULLBACK";
break;
case CLIENTSTATE_SUSPEND:
statString = "SUSPEND";
break;
case CLIENTSTATE_RESUME:
statString = "RESUME";
break;
case CLIENTSTATE_DISPOSE:
statString = "DISPOSE";
break;
case CLIENTSTATE_RESET:
statString = "RESET";
break;
case CLIENTSTATE_CANCEL:
statString = "CANCEL";
break;
case CLIENTSTATE_INVOKE_CALLBACK:
statString = "INVOKE_CALLBACK";
break;
default:
statString = "unknown...";
}
/* set status */
ngLog.printLog(
NgLog.LOGCATEGORY_NINFG_INTERNAL,
NgLog.LOGLEVEL_INFO,
this,
"NgGrpcHandle#" + methodName + ": client status -> " + statString + ".");
this.status = new Integer(status);
}
/**
* Lock the handle.
*
* @throws GrpcException if it's interrupted.
*/
private synchronized void lockHandle() throws GrpcException {
/* wait for Unlocked */
while (isLocked == true) {
try {
wait();
} catch (InterruptedException e) {
throw new NgException(e);
}
}
/* lock */
isLocked = true;
}
/**
* Unlock the handle.
*
* @throws GrpcException if it's interrupted.
*/
private synchronized void unlockHandle() throws GrpcException {
/* check if it's locked */
if (isLocked == false) {
throw new NgException("Nobody lock the handle.");
}
/* unlock */
isLocked = false;
/* notifyAll */
notifyAll();
}
/**
* @return
*/
protected long getJobStartTimeout() {
return this.jobStartTimeout;
}
/**
* @return
*/
protected NgGrpcJob getJob() {
return this.job;
}
/**
* @return
*/
protected NgGrpcClient getContext() {
return this.context;
}
/**
* @return
*/
protected int getID() {
return this.executableID;
}
/**
* @return
*/
protected int getSessionID() {
return this.sessionID;
}
/**
* @param prop
*
* @return
*/
private static Properties getDefaultServerProperties(Properties prop) {
Properties tmpProperties = new Properties();
tmpProperties.put(RemoteMachineInfo.KEY_HOSTNAME, prop.get(RemoteMachineInfo.KEY_HOSTNAME));
return tmpProperties;
}
}