Previous | Contents | Index |
This chapter discusses how to use streams between ACMS and an agent program. This chapter also provides reference material for calling the stream services in agent programs.
Streams are ACMS communication channels that permit ACMS to communicate with devices not supported by DECforms or TDMS. In most instances, you can now use the Request Interface (RI) to communicate with unsupported devices. Stream services are still useful, however, if:
Stream services are supported but will not be developed further in subsequent versions of ACMS. |
ACMS can use streams to communicate with agents. Agents can communicate with unsupported devices.
You use ACMS$INIT_EXCHANGE_IO to create and connect a stream. See Section 4.6 for details regarding ACMS$INIT_EXCHANGE_IO.
You use ACMS$TERM_EXCHANGE_IO to disconnect a stream and delete a stream. See Section 4.9 for details regarding ACMS$TERM_EXCHANGE_IO.
You use ACMS$WAIT_FOR_STREAM_IO and ACMS$REPLY_TO_STREAM_IO to wait for and reply to messages from the EXC.
If an agent program enables stream I/O and associates it with a submitter, the agent program must call ACMS$WAIT_FOR_STREAM_IO for all tasks (except tasks that do no terminal I/O), whether or not the task performs stream I/O. |
Figure 6-1 shows an agent program using a stream to communicate with ACMS.
Figure 6-1 Using Stream Services to Communicate with ACMS
Table 6-1 lists the services for stream communication and gives a brief description of each. (Reference material in this chapter lists these services in alphabetical order.)
Service Name | Description |
---|---|
ACMS$WAIT_FOR_STREAM_IO |
Waits for a message on the stream. Use the ACMS$REPLY_TO_STREAM_IO
after processing any information to acknowledge the exchange.
If an agent program enables stream I/O and associates it with a submitter, the agent program must call ACMS$WAIT_FOR_STREAM_IO for all tasks (except tasks that do no terminal I/O), whether or not the task performs stream I/O. |
ACMS$REPLY_TO_STREAM_IO |
Acknowledges completion of a stream exchange step that was detected
using ACMS$WAIT_FOR_STREAM_IO. The task instance then resumes execution.
Use this service in combination with ACMS$WAIT_FOR_STREAM_IO. |
When a task performs stream I/O, the block step for that task must use the WITH STREAM I/O phrase. Also, if the task performs stream I/O, the agent program must call the task using either the ACMS$START_CALL service or the asynchronous ACMS$CALL_A service. The EXC performs stream I/O for a task when it encounters the READ, READ WITH PROMPT, and WRITE clauses in the exchange step of a task definition. For example:
BLOCK WORK WITH STREAM I/O task1_exchange1: EXCHANGE IS READ ACMS$DATA_WORKSPACE; task1_process1: PROCESSING IS CALL procedure-name USING ACMS$DATA_WORKSPACE; task1_exchange2: EXCHANGE IS WRITE ACMS$DATA_WORKSPACE; END BLOCK; |
The EXC is the active end of the stream because it interprets the task definition and sends and requests information from the agent program. The agent program is the passive end of the stream because it does not initiate any communication, but waits for and reacts to requests from the EXC.
The agent program calls the ACMS$WAIT_FOR_STREAM_IO service to wait for a request from the EXC. After processing the information, the agent program calls the ACMS$REPLY_TO_STREAM_IO service to respond to the request.
Before any communication can begin, however, the agent program must initialize a stream. Earlier versions of ACMS used four other stream services. These earlier services have been superseded. The superseded services are:
ACMS supports these services for agent programs that have already been developed. In new agent programs, however, use ACMS$INIT_EXCHANGE_IO and ACMS_TERM_EXCHANGE_IO. Whenever practical, replace superseded services with ACMS$INIT_EXCHANGE_IO and ACMS_TERM_EXCHANGE_IO.
Superseded services are discussed in Appendix A.
6.2 ACMS$REPLY_TO_STREAM_IO
This service responds to I/O requests on the stream. If an input string
is provided, the agent program must gather information for the
ACMS$WAIT_FOR_STREAM_IO input string and fill the string before calling
this service.
ACMS$REPLY_TO_STREAM_IO (connect_id.rq.r,
io_id.wq.r,
[io_status.rl.r])ACMS$REPLY_TO_STREAM_IO_A (connect_id.rq.r,
io _id.wq.r,
[io_status.rl.r],
[comp_status.wq.r],
[efn.rbu.r],
[astadr.szem.r],
[astprm.rz.v])
connect_id
The identification of the stream you are replying to. The ACMS$INIT_EXCHANGE_IO service returns this connect ID.io_id
The identification of the message you are replying to. The ACMS$WAIT_FOR_STREAM_IO service returns this ID.io_status
The agent program must provide a success value if it wishes to complete the exchange and allow the task to continue. The agent program must provide a failure value if the agent program has detected an error and wishes to cancel the exchange and cancel the task.
The parameters comp_status.wq.r, efn.rbu.r, astadr.szem.r, and astprm.rz.v are asynchronous service arguments. See Chapter 2 for a discussion of these parameters.
The return status codes indicating success or failure of the call follow:
Status Severity Level Description ACMS$_NORMAL Success Normal successful completion. ACMS$_PENDING Informational Successful operation pending asynchronous completion. The final status is in the completion status block. ACMS$_BADCONID Error The connect ID is not correct; either the stream is not connected, the stream is disconnected, or the connect ID is corrupt. ACMS$_BADIOID Error The I/O ID is not correct; either there is no such I/O request, the I/O request has already been replied to, or the I/O ID is corrupt. ACMS$_CANOTDOIO Error Cannot perform stream I/O on this connection. The stream is not yet connected. ACMS$_IONOTACT Error The specified I/O request was not active. ACMS$_INSUFPRM Error Not enough arguments were passed to this service. ACMS$_INVASTADR Error The AST address was invalid. ACMS$_INVASTPRM Error The AST routine parameter was invalid. ACMS$_INVCMPSTS Error The completion status block was invalid. ACMS$_INVCONID Error The connect ID was invalid. ACMS$_INVEFN Error The event flag was invalid. ACMS$_INVIOID Error The I/O ID was invalid. ACMS$_STRMMSGTOOBIG Error The size of the objects, including stream overhead, is too large to send over the stream. ACMS$_SYNASTLVL Error You cannot call synchronous services from AST level. ACMS$_WRONGCON Error The specified I/O request was not on this connection. ACMS$_BADACTREC Fatal Bad record format in active response message.
6.3 ACMS$WAIT_FOR_STREAM_IO
Waits for I/O messages. This service completes when the Application
Execution Controller (EXC) executes a READ or WRITE clause in the task
definition. If an agent program enables stream I/O and associates it
with a submitter, the agent program must call ACMS$WAIT_FOR_STREAM_IO
for all tasks (except tasks that do no terminal I/O), whether or not
the task performs stream I/O.
When ACMS$WAIT_FOR_STREAM_IO returns the status code ACMS$_SENDER_DISCONN, the EXC has disconnected from the stream. The agent program then calls ACMS$WAIT_FOR_CALL_END to wait for the end of the task.
ACMS$WAIT_FOR_STREAM_IO (connect_id.rq.r,
output_object.wz.r,
input_object.wz.r,
io_id.wq.r,
[cancel_routine.zem.r],
[cancel_param.rz.v])ACMS$WAIT_FOR_STREAM_IO_A (connect_id.rq.r,
output _object.wz.r,
input _object.wz.r,
io _id.wq.r,
[comp_status.wq.r],
[efn.rbu.r],
[astadr.szem.r],
[astprm.rz.v],
[cancel_routine.zem.r],
[cancel_param.rz.v])
connect_id
The identification of the stream on which you are waiting for data requests. The ACMS$INIT_EXCHANGE_IO service returns this connect ID.output_object
The message sent over the stream from the Application Execution Controller (EXC) to the agent program.This is a pointer to a string descriptor containing either a prompt or output information from the EXC. The agent program can use this output as a terminal prompt (if the task uses a terminal) or, for example, as a key to get a record from a file. If there is no output, the parameter contains a zero; otherwise, it contains the address of the string descriptor.
input_object
The address of the message to send over the stream from the agent program to the EXC.This is a pointer to an empty string descriptor reserved for reply information from the agent. The agent program gathers input and puts it into this descriptor before calling the ACMS$REPLY_TO_STREAM_IO service. If there is no input, the parameter contains a zero; otherwise, it contains the address of the string descriptor.
The agent program truncates the strings or blank fills them to the right as necessary.
io_id
The identification that this service returns. ACMS$REPLY_TO_STREAM_IO later uses this ID to distinguish which I/O request to reply to.cancel_routine
When you use the cancel_routine parameter in an agent, the agent program is notified if the ACMS EXC requests to cancel the current stream I/O operation. If the cancel_routine parameter is omitted, the agent program is not notified of the EXC request to cancel the stream I/O. The cancel_routine executes at AST level and is passed the following parameters:
- cancel_param
The cancel parameter that was passed with the ACMS$WAIT_FOR_STREAM_IO service.- connect_id
The address of the two-longword identification of the current stream connection. ACMS$INIT_EXCHANGE_IO passes this ID.- io_id
The address of the two-longword identification of the stream I/O request to cancel.After receiving cancel notification, the agent program responds to the cancellation by calling ACMS$REPLY_TO_STREAM_IO. Until the agent program calls ACMS$REPLY_TO_STREAM_IO, ACMS cannot cancel the task.
cancel_param
The value to be passed to the cancel routine.
The parameters comp_status.wq.r, efn.rbu.r, astadr.szem.r, and astprm.rz.v are asynchronous service arguments. See Chapter 2 for a discussion of these parameters.
The return status codes indicating success or failure of the call follow:
Status Severity Level Description ACMS$_NORMAL Success Normal successful completion. ACMS$_PENDING Informational Successful operation pending asynchronous completion. The final status is in the completion status block. ACMS$_DISC_PURGE Warning A disconnect purged the I/O request. ACMS$_SENDER_DISCONN Warning The sender has disconnected from the stream. ACMS$_BADCONID Error The connect ID is not correct; either the stream is not connected, the stream is disconnected, or the connect ID is corrupt. ACMS$_CANOTDOIO Error Cannot perform stream I/O on this connection. The stream is not yet connected. ACMS$_INSUFPRM Error Not enough arguments were passed to this service. ACMS$_INVASTADR Error The AST address was invalid. ACMS$_INVASTPRM Error The AST routine parameter was invalid. ACMS$_INVCMPSTS Error The completion status block was invalid. ACMS$_INVCONID Error The connect ID was invalid. ACMS$_INVEFN Error The event flag was invalid. ACMS$_INVINOBJ Error The input object was invalid. ACMS$_INVIOID Error The I/O ID was invalid. ACMS$_INVOUTOBJ Error The output object was invalid. ACMS$_IO_ACTIVE Error An I/O request is already active on this connect; only one I/O request is allowed at a time. ACMS$_SYNASTLVL Error You cannot call synchronous services from AST level.
This chapter includes examples of agent programs written in C, FORTRAN, BLISS, and Pascal:
Example 7-1 is a user-written agent program in C that starts a distributed transaction. The agent program uses the $START_TRANSW system service to start the distributed transaction.
The flow of events in the agent program is as follows (the numbers correspond to those in the sample program):
Compaq ACMS for OpenVMS Writing Applications contains additional information about this program.
Example 7-1 C Agent Program that Starts a Distributed Transaction |
---|
/**************************************************************/ /* */ /* Version: 01 */ /* Edit dates: 06-MAR-90 */ /* Authors: DIGITAL */ /* */ /**************************************************************/ /**************************************************************/ /* F U N C T I O N A L D E S C R I P T I O N */ /* */ /* */ /* VR_AGENT is an ACMS agent program that acts like an ATM */ /* where you type in your reservation number and odometer */ /* reading, drop the keys in a slot, and walk away. The */ /* system bills you later for the amount you owe. The */ /* agent uses QIOs to get the data, starts a distributed */ /* transaction, then calls a task to do the work. The task */ /* consists of a nonparticipating step that validates the */ /* reservation number, a step that queues a task to do the */ /* actual checkin work, and a step that writes a history */ /* record. If the task succeeds, the agent commits the */ /* transaction. If the task fails, the agent aborts the */ /* the transaction and notifies the user of the problem. */ /* The agent is also responsible for handling errors, such */ /* as transaction timeouts. */ /* */ /**************************************************************/ /*************************************************/ /* */ /* Include's / Define's / Macros Required */ /* */ /*************************************************/ #include descrip #include iodef #include rmsdef #include ssdef #include stdio #include stsdef #include ACMS$SUBMITTER #include ACMS$STREAM #define MAX_RESERVATION 10 #define MAX_ODOMETER 6 #define MAX_RETRY 5 #define TRUE 1 #define FALSE 0 #define CANCEL 'C' #define check_status(stat) if (!(stat & 1)) LIB$STOP(stat) /*****************************/ /* */ /* Declare Global Data */ /* */ /*****************************/ globalvalue ACMS$_NOSUCH_PKG; globalvalue ACMS$_SRVDEAD; globalvalue ACMS$_TRANSTIMEDOUT; globalvalue RDB$_DEADLOCK; globalvalue RDB$_LOCK_CONFLICT; globalvalue RDMS$_DEADLOCK; globalvalue RDMS$_LCKCNFLCT; globalvalue RDMS$_TIMEOUT; typedef struct { short int bufsize; short int itmcode; int bufadr; int retlen; } item; typedef int quadword[2]; struct fast_check_in_blk { int reservation_id; int return_odometer_reading; quadword actual_return_date; } fast_check_in_wksp; struct io_stat_blk { short int status ; short int msg_len ; int unused; } iosb; struct { item pr_id; int terminator; } task_info_list; int status, *tid[ 4 ], argument_list[ 5 ]; short chan; char task_status[ 80 ]; $DESCRIPTOR(task_status_desc, task_status); $DESCRIPTOR(task_name_desc, "VR_FAST_CHECKIN_TASK"); $DESCRIPTOR(appl_name_desc, "VR_APPL"); struct dsc$descriptor_s fast_check_in_wksp_desc; struct ACMS$SUBMITTER_ID submitter_id; struct ACMS$PROCEDURE_ID procedure_id; struct ACMS$CALL_ID call_id; main () /**************************************************************************/ /* */ /* Get procedure information to see if the application is running. */ /* */ /* While the application is up and running, prompt user for */ /* reservation ID and odometer reading. */ /* */ /* If the user enters the data, process the fast checkin transaction. */ /* */ /* If the user aborts, then notify the user that the transaction was */ /* not processed. */ /* */ /**************************************************************************/ { for (;;) { status = initialization (); check_status(status); status = ACMS$GET_PROCEDURE_INFO(&submitter_id, &task_name_desc, &appl_name_desc, &task_info_list); while (status & STS$M_SUCCESS) { status = get_data (); (1) if (status & STS$M_SUCCESS) status = process_this_transaction(); else if (status == RMS$_EOF) status = report_user_abort(); check_status(status); status = ACMS$GET_PROCEDURE_INFO(&submitter_id, &task_name_desc, &appl_name_desc, &task_info_list); } if (status == ACMS$_NOSUCH_PKG) status = application_not_running(); check_status(status); status = termination (); check_status(status); } } initialization () /************************************************/ /* */ /* Assign channel and sign user in to ACMS. */ /* Set up descriptors, task info list, and */ /* argument lists for later processing */ /* */ /************************************************/ { $DESCRIPTOR(terminal, "SYS$COMMAND"); status = SYS$ASSIGN (&terminal, &chan,0,0); if (status & STS$M_SUCCESS) status = ACMS$SIGN_IN(&submitter_id, 0, 0); if (status & STS$M_SUCCESS) { fast_check_in_wksp_desc.dsc$w_length = sizeof(fast_check_in_wksp); fast_check_in_wksp_desc.dsc$a_pointer = &fast_check_in_wksp; fast_check_in_wksp_desc.dsc$b_dtype = DSC$K_DTYPE_T; fast_check_in_wksp_desc.dsc$b_class = DSC$K_CLASS_S; task_info_list.pr_id.bufsize = ACMS$S_PROCEDURE_ID; task_info_list.pr_id.itmcode = ACMS$K_PROC_PROCEDURE_ID; task_info_list.pr_id.bufadr = &procedure_id; task_info_list.pr_id.retlen = 0; task_info_list.terminator = 0; argument_list[ 0 ] = 4; argument_list[ 1 ] = 0; argument_list[ 2 ] = &task_status_desc; argument_list[ 3 ] = 0; argument_list[ 4 ] = &fast_check_in_wksp_desc; } return status; } get_data () /*********************************************************/ /* */ /* Prompt for reservation ID and odometer reading. */ /* */ /* For the purpose of this example, it is expected */ /* that input will consist of numeric characters & */ /* that the user will enter leading zeroes. */ /* */ /* e.g., Reservation ID 000123456 */ /* Odometer Reading 05575 */ /* */ /* Validation will be done to ensure this; the */ /* user can abort by entering "Cancel." */ /* */ /*********************************************************/ { short input_complete; char reservation[ MAX_RESERVATION ]; char odometer[ MAX_ODOMETER ]; $DESCRIPTOR(reservation_desc, reservation); $DESCRIPTOR(odometer_desc, odometer); $DESCRIPTOR(input_reservation_id, "Input Reseveration Id 'Cancel' to Exit: "); $DESCRIPTOR(input_odometer, "Input Odometer Reading 'Cancel' to Exit: "); input_complete = FALSE; while (input_complete == FALSE) { printf("\n"); status = SYS$QIOW (0, chan, IO$_READPROMPT|IO$M_CVTLOW, &iosb, 0, 0, &reservation, reservation_desc.dsc$w_length, 0, 0, input_reservation_id.dsc$a_pointer, input_reservation_id.dsc$w_length); if (status & STS$M_SUCCESS) status = iosb.status; if ((status & STS$M_SUCCESS) && (reservation[0] == CANCEL)) status = RMS$_EOF; if (status & STS$M_SUCCESS) status = OTS$CVT_TU_L(&reservation_desc, &fast_check_in_wksp.reservation_id, 4,0); if ((status & STS$M_SUCCESS) || (status == RMS$_EOF)) input_complete = TRUE; } if (status & STS$M_SUCCESS) { input_complete = FALSE; while (input_complete == FALSE) { printf("\n"); status = SYS$QIOW (0, chan, IO$_READPROMPT|IO$M_CVTLOW, &iosb, 0, 0, &odometer, odometer_desc.dsc$w_length, 0, 0, input_odometer.dsc$a_pointer, input_odometer.dsc$w_length); if (status & STS$M_SUCCESS) status = iosb.status; if ((status & STS$M_SUCCESS) && (odometer[0] == CANCEL)) status = RMS$_EOF; if (status & STS$M_SUCCESS) status = OTS$CVT_TU_L(&odometer_desc, &fast_check_in_wksp.return_odometer_reading, 4,0); if ((status & STS$M_SUCCESS) || (status == RMS$_EOF)) input_complete = TRUE; } } if (status & STS$M_SUCCESS) status = SYS$GETTIM (&fast_check_in_wksp.actual_return_date); return status; } process_this_transaction() /********************************************************************/ /* */ /* Start transaction. Call the task. Commit if successful. */ /* Abort if failure. Retry if timed out. Notify user whether */ /* transaction succeeded or failed. */ /* */ /********************************************************************/ { short retry, trans_completed; retry = 0; trans_completed = FALSE; while ((trans_completed == FALSE) && (retry < MAX_RETRY)) { status = SYS$START_TRANSW (0,0,&iosb,0,0,tid); (2) if (status & STS$M_SUCCESS) status = iosb.status; check_status(status); status = call_return_task(); (3) if (status & STS$M_SUCCESS) { status = SYS$END_TRANSW (0,0,&iosb,0,0,tid); (4) if (status & STS$M_SUCCESS) status = iosb.status; check_status(status); trans_completed = TRUE; } else { if ((status == ACMS$_TRANSTIMEDOUT) || (status == ACMS$_SRVDEAD) || (status == RDB$_DEADLOCK) || (status == RDMS$_DEADLOCK) || (status == RDB$_LOCK_CONFLICT) || (status == RDMS$_LCKCNFLCT) || (status == RDMS$_TIMEOUT)) ++retry; else retry = MAX_RETRY; status = SYS$ABORT_TRANSW (0,0,&iosb,0,0,tid); (5) if (status & STS$M_SUCCESS) status = iosb.status; check_status(status); } } if (trans_completed == FALSE) status = notify_failure(); else status = notify_success(); return status; } call_return_task() /******************************************************************/ /* */ /* Call the task. Error handling will be done by the calling */ /* task. */ /* */ /******************************************************************/ { status = ACMS$START_CALL (&submitter_id, &procedure_id, &call_id, argument_list, tid); if (status & STS$M_SUCCESS) status = ACMS$WAIT_FOR_CALL_END (&submitter_id, &call_id); return status; } notify_failure () /*******************************************************************/ /* */ /* Failure returned from called task is displayed to the user. */ /* */ /*******************************************************************/ { printf("\n"); status = SYS$QIOW (0, chan, IO$_WRITEVBLK, &iosb, 0, 0, task_status_desc.dsc$a_pointer, task_status_desc.dsc$w_length, 0, 0, 0, 0); return status; } application_not_running() /*************************************************************/ /* */ /* Display application not running and wait 60 seconds. */ /* */ /*************************************************************/ { float wait_time = 10.0; $DESCRIPTOR(appl_not_up_msg, "Application not Available at this time" ); printf("\n"); status = SYS$QIOW (0, chan, IO$_WRITEVBLK, &iosb, 0, 0, appl_not_up_msg.dsc$a_pointer, appl_not_up_msg.dsc$w_length, 0, 0, 0, 0); if (status & STS$M_SUCCESS) status = iosb.status; if (status & STS$M_SUCCESS) status = LIB$WAIT(&wait_time); return status; } notify_success() /*************************************************************/ /* */ /* Display transaction has been successfully completed. */ /* */ /*************************************************************/ { $DESCRIPTOR(success_queued_msg, "Transaction Successfully Queued "); printf("\n"); status = SYS$QIOW (0, chan, IO$_WRITEVBLK, &iosb, 0, 0, success_queued_msg.dsc$a_pointer, success_queued_msg.dsc$w_length, 0, 0, 0, 0); if (status & STS$M_SUCCESS) status = iosb.status; return status; } report_user_abort() /******************************************************/ /* */ /* Display operation canceled at user's request. */ /* */ /******************************************************/ { $DESCRIPTOR(user_abort_msg, "Fast Checkin Has Been Canceled by user "); printf("\n"); status = SYS$QIOW (0, chan, IO$_WRITEVBLK, &iosb, 0, 0, user_abort_msg.dsc$a_pointer, user_abort_msg.dsc$w_length, 0, 0, 0, 0); if (status & STS$M_SUCCESS) status = iosb.status; return status; } termination () /****************************************************/ /* */ /* Deassign channel and sign user out of ACMS */ /* */ /****************************************************/ { status = SYS$DASSGN(chan); if (status & STS$M_SUCCESS) status = ACMS$SIGN_OUT(&submitter_id); return status; } |
Previous | Next | Contents | Index |