575 lines
15 KiB
SAS
Executable File
575 lines
15 KiB
SAS
Executable File
/**
|
|
@file
|
|
@brief Loads CSV and sends to the staging area for approval
|
|
@details this macro used to capture multiple CSVs (eg from one excel file) in
|
|
a staging area and send them all to a landing area. For simplicity this
|
|
functionality is now deprecated, and each load should be made as a seperate
|
|
request. If there is a use case to load multiple tables at once, the client
|
|
should manage this and load them seperately.
|
|
|
|
@param url= used for debugging (provided by stagedata stp)
|
|
@param dlm= use to provide alternative delimeters for CSVs (not just comma)
|
|
@param [in] termstr= (crlf) Always crlf from adapter, whereas loadfile service
|
|
figures it out
|
|
|
|
<h4> SAS Macros </h4>
|
|
@li dc_assignlib.sas
|
|
@li mf_getattrn.sas
|
|
@li mf_getuser.sas
|
|
@li mf_mkdir.sas
|
|
@li mf_verifymacvars.sas
|
|
@li mp_abort.sas
|
|
@li mp_cntlout.sas
|
|
@li mp_dirlist.sas
|
|
@li mp_lockanytable.sas
|
|
@li mpe_accesscheck.sas
|
|
@li mpe_alerts.sas
|
|
@li mpe_xlmapvalidate.sas
|
|
@li mpe_loadfail.sas
|
|
@li mpe_runhook.sas
|
|
|
|
@version 9.2
|
|
@author 4GL Apps Ltd
|
|
@copyright 4GL Apps Ltd. This code may only be used within Data Controller
|
|
and may not be re-distributed or re-sold without the express permission of
|
|
4GL Apps Ltd.
|
|
**/
|
|
|
|
%macro mpe_loader(
|
|
mperef= /* name of subfolder containing the staged data */
|
|
,mDebug=0 /* set to 1 for development or debugging */
|
|
,submitted_reason_txt= /* populates column of same name in sumo_approvals*/
|
|
,approver= /* allows a userid to be provided for direct approval email */
|
|
,url= /* optional - url for debugging */
|
|
,dlm=%str(,)
|
|
,termstr=crlf
|
|
,dc_dttmtfmt=E8601DT26.6
|
|
);
|
|
%put entered mpe_loader from &=_program;
|
|
%put &=url;
|
|
%put &=termstr;
|
|
%put &=dlm;
|
|
/* determine full path to CSV directory */
|
|
%local now;
|
|
%let now=&dc_dttmtfmt;
|
|
%put &=now;
|
|
|
|
/**
|
|
* get full path to package (only subdirectory passed through)
|
|
*/
|
|
%mp_abort(
|
|
iftrue=(%mf_verifymacvars(mperef mpelocapprovals)=0)
|
|
,mac=bitemporal_dataloader
|
|
,msg=%str(Missing: mperef mpelocapprovals)
|
|
)
|
|
|
|
%let csv_dir=%trim(&mpelocapprovals/&mperef);
|
|
|
|
/* exit if package has already been uploaded */
|
|
%local check;
|
|
proc sql noprint;
|
|
select count(*) into: check
|
|
from &mpelib..mpe_loads
|
|
where csv_dir="&mperef";
|
|
%if &check %then %do;
|
|
%mp_abort(msg=Folder &mperef already has an entry in &mpelib..mpe_loads
|
|
,mac=mpe_loader.sas);
|
|
%return;
|
|
%end;
|
|
|
|
/* get CSV directory contents */
|
|
%mp_dirlist(path=&csv_dir,outds=WORK.getfiles)
|
|
data WORK.csvs;
|
|
set WORK.getfiles;
|
|
if upcase(scan(filename,3,'.'))='CSV' then do;
|
|
lib=upcase(scan(filename,1,'.'));
|
|
ds=upcase(scan(filename,2,'.'));
|
|
output;
|
|
end;
|
|
run;
|
|
|
|
/* get table attributes */
|
|
proc sql noprint;
|
|
create table WORK.sumo_tables as
|
|
select a.filename, b.*
|
|
from WORK.csvs a
|
|
left join &mpelib..mpe_tables b
|
|
on a.lib=b.libref
|
|
and a.ds=b.dsn
|
|
where b.tx_from le &now
|
|
and &now lt b.tx_to;
|
|
|
|
/* define user as meta user if available */
|
|
%local user;
|
|
%let user=%mf_getuser();
|
|
|
|
/* check if there is actually a table to load */
|
|
%if %mf_getattrn(WORK.sumo_tables,NLOBS)=0 %then %do;
|
|
%let msg=Table not registered in &mpelib..mpe_tables;
|
|
%mpe_loadfail(
|
|
status=&msg
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%mp_abort(msg=&msg,mac=mpe_loader.sas);
|
|
%return;
|
|
%end;
|
|
|
|
proc sql;
|
|
insert into &mpelib..mpe_loads
|
|
set USER_NM="&user"
|
|
,STATUS='IN PROGRESS'
|
|
,CSV_dir="&mperef"
|
|
,PROCESSED_DTTM=&now
|
|
,reason_txt = symget('submitted_reason_txt');
|
|
|
|
|
|
/* import CSV */
|
|
|
|
%let droplist=;
|
|
%let attrib=;
|
|
%let droplist=;
|
|
%let libref=;
|
|
%let DS=;
|
|
|
|
/* get table info */
|
|
data _null_;
|
|
set sumo_tables;
|
|
libds=upcase(cats(libref,'.',dsn));
|
|
call symputx('orig_libds',libds);
|
|
is_fmt=0;
|
|
if substr(cats(reverse(dsn)),1,3)=:'CF-' then do;
|
|
libds=scan(libds,1,'-');
|
|
putlog "Format Catalog Captured";
|
|
libds='work.fmtextract';
|
|
is_fmt=1;
|
|
end;
|
|
call symputx('is_fmt',is_fmt);
|
|
call symputx('libds',libds);
|
|
call symputx('FNAME',filename);
|
|
call symputx('LIBREF',libref);
|
|
call symputx('DS',dsn);
|
|
call symputx('LOADTYPE',loadtype);
|
|
call symputx('BUSKEY',buskey);
|
|
call symputx('VAR_TXFROM',var_txfrom);
|
|
call symputx('VAR_TXTO',var_txto);
|
|
call symputx('VAR_BUSFROM',var_busfrom);
|
|
call symputx('VAR_BUSTO',var_busto);
|
|
call symputx('VAR_PROCESSED',var_processed);
|
|
call symputx('RK_UNDERLYING',RK_UNDERLYING);
|
|
call symputx('POST_EDIT_HOOK',POST_EDIT_HOOK);
|
|
call symputx('NOTES',NOTES);
|
|
call symputx('PK',coalescec(RK_UNDERLYING,buskey));
|
|
call symputx('NUM_OF_APPROVALS_REQUIRED',NUM_OF_APPROVALS_REQUIRED,'l');
|
|
put (_all_)(=);
|
|
stop;
|
|
run;
|
|
|
|
%if %length(&ds)=0 %then %do;
|
|
%let msg=%str(ERR)OR: Unable to extract record from &mpelib..mpe_tables;
|
|
%mpe_loadfail(
|
|
status=FAILED
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,reason_txt=%quote(&msg)
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%mp_abort(msg=&msg,mac=mpe_loader.sas);
|
|
%return;
|
|
%end;
|
|
|
|
/* export format catalog */
|
|
%mp_cntlout(
|
|
iftrue=(&is_fmt=1)
|
|
,libcat=&orig_libds
|
|
,fmtlist=0
|
|
,cntlout=work.fmtextract
|
|
)
|
|
|
|
/* user must have EDIT access to load a table */
|
|
%mpe_accesscheck(&orig_libds
|
|
,outds=work.sumo_access
|
|
,user=&user
|
|
,access_level=EDIT )
|
|
%put exiting accesscheck;
|
|
|
|
%if %mf_getattrn(work.sumo_access,NLOBS)=0 %then %do;
|
|
%let msg=%str(ERR)OR: User is not authorised to edit &orig_libds!;
|
|
%mpe_loadfail(
|
|
status=UNAUTHORISED
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,reason_txt=%quote(&msg)
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%mp_abort(msg=&msg,mac=mpe_loader.sas);
|
|
%return;
|
|
%end;
|
|
|
|
%put now importing: "&csv_dir/&fname" termstr=&termstr;
|
|
/* get the variables from the CSV */
|
|
data vars_csv1(index=(idxname=(varnum name)) drop=infile);
|
|
infile "&csv_dir/&fname" lrecl=32767 dsd termstr=&termstr encoding='utf-8';
|
|
input;
|
|
length infile $32767;
|
|
infile=compress(_infile_,'"',);
|
|
infile=compress(infile,"'",);
|
|
format name $32.;
|
|
putlog 'received vars: ' infile;
|
|
call symputx('received_vars',infile,'l');
|
|
do varnum=1 to countw(infile,"&dlm");
|
|
/* keep writeable chars */
|
|
name=compress(upcase(scan(infile,varnum)),,'kw');
|
|
if name ne "_____DELETE__THIS__RECORD_____" then output;
|
|
end;
|
|
stop;
|
|
run;
|
|
%put received_vars = &received_vars;
|
|
|
|
%dc_assignlib(WRITE,&libref)
|
|
|
|
/* get list of variables and their formats */
|
|
proc contents noprint data=&libds
|
|
out=vars(keep=name type length varnum format:);
|
|
run;
|
|
data vars(keep=name type length varnum format);
|
|
set vars(rename=(format=format2 type=type2));
|
|
name=upcase(name);
|
|
format2=upcase(format2);
|
|
/* not interested in transaction or processing dates
|
|
(append table must be supplied without them) */
|
|
if name not in ("&VAR_TXFROM","&VAR_TXTO","&VAR_PROCESSED"
|
|
,"_____DELETE__THIS__RECORD_____");
|
|
if type2 in (2,6) then do;
|
|
length format $49.;
|
|
if format2='' then format=cats('$',length,'.');
|
|
else format=cats(format2,max(formatl,length),'.');
|
|
type='char';
|
|
end;
|
|
else do;
|
|
if format2='' then format=cats(length,'.');
|
|
else if format2=:'DATETIME' or format2=:'E8601DT' then do;
|
|
format='DATETIME19.';
|
|
end;
|
|
else if format2=:'DATE' or format2=:'DDMMYY'
|
|
or format2=:'MMDDYY' or format2=:'YYMMDD'
|
|
or format2=:'E8601DA' or format2=:'B8601DA'
|
|
then do;
|
|
format='DATE9.';
|
|
end;
|
|
else if format2='BEST' & formatl=0 then format=cats('BEST',length,'.');
|
|
/*
|
|
else if format2=:'DATETIME' or format2=:'DATE' or format2=:'DDMMYY'
|
|
or format2=:'MMDDYY' or format2=:'YYMMDD' then do;
|
|
*date or datetime format so use original ;
|
|
dsid=open("&libref..&ds");
|
|
vnum=varnum(dsid,name);
|
|
format=varfmt(dsid,vnum);
|
|
dsid=close(dsid);
|
|
end;
|
|
*/
|
|
else do;
|
|
if formatl=0 then formatl=length;
|
|
format=cats(format2,formatl,'.',formatd);
|
|
end;
|
|
type='num';
|
|
end;
|
|
put (_all_)(=);
|
|
run;
|
|
|
|
/* build attrib statement */
|
|
data vars_attrib;
|
|
length attrib_statement $32767 type2 $20;
|
|
set vars end=lastobs;
|
|
retain attrib_statement;
|
|
|
|
if type='char' then type2='$';
|
|
str1=catx(' ',name,'length=',cats(type2,length));
|
|
attrib_statement=trim(attrib_statement)!!' '!!trim(str1);
|
|
|
|
if lastobs then call symputx('ATTRIB',attrib_statement,'L');
|
|
run;
|
|
|
|
/* build input statement - first get vars in right order
|
|
and join with target formats*/
|
|
proc sql noprint;
|
|
create table vars_csv2 as
|
|
select b.*
|
|
from vars_csv1 a
|
|
left join vars_attrib b
|
|
on a.name=b.name
|
|
order by a.varnum;
|
|
|
|
|
|
/* now build input statement */
|
|
data final_check;
|
|
set vars_csv2 end=lastobs;
|
|
length input_statement $32767 type2 $20 droplist $32767;
|
|
retain input_statement droplist;
|
|
|
|
/* Build input statement - CATCH EXCEPTIONS HERE!*/
|
|
if name in ('QUOTE_DTTM') then do;
|
|
name=cats(name,'2');
|
|
droplist=catx(' ',trim(droplist),name);
|
|
type2='$20.';/* converted below */
|
|
end;
|
|
else if type='char' then type2=cats('$CHAR', length,'.');
|
|
else if format='DATE9.' then type2='ANYDTDTE.';
|
|
else if format='DATETIME19.' then type2='ANYDTDTM.';
|
|
else if format=:'TIME' then type2='ANYDTTME.';
|
|
else if name='' then do;/* additional vars in input data */
|
|
name='_____DELETE__THIS__VARIABLE_____';
|
|
droplist=catx(' ',trim(droplist),'_____DELETE__THIS__VARIABLE_____');
|
|
type2='$1.';
|
|
end;
|
|
else type2='best32.';
|
|
* else type2=cats(length,'.');
|
|
|
|
input_statement=catx(' ',input_statement,name,':',type2);
|
|
|
|
if lastobs then do;
|
|
call symputx('INPUT', input_statement,'L');
|
|
if trim(droplist) ne '' then
|
|
call symputx('droplist',"drop "!!droplist!!';','l');
|
|
end;
|
|
run;
|
|
|
|
%let mpeloadstop=0;
|
|
|
|
data work.STAGING_DS;
|
|
&droplist;
|
|
infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767
|
|
firstobs=2 missover termstr=&termstr encoding='utf-8';
|
|
attrib &attrib ;
|
|
if _n_=1 then call missing (of _all_);
|
|
missing a b c d e f g h i j k l m n o p q r s t u v w x y z _;
|
|
input
|
|
%if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____ %then %do;
|
|
_____DELETE__THIS__RECORD_____: $3.
|
|
%end;
|
|
&input;
|
|
|
|
%if %index(%quote(&attrib.),UNLIKELY_VAR ) %then %do;
|
|
/*UNLIKELY_VAR=input(UNLIKELY_VAR2,ANYDTDTM21.);*/
|
|
/* SPECIAL LOGIC FOR SPECIAL VARS */
|
|
%end;
|
|
|
|
if _error_ ne 0 then do;
|
|
putlog _infile_;
|
|
call symputx('mpeloadstop',_n_);
|
|
stop;
|
|
end;
|
|
/* remove all blank rows */
|
|
if compress(cats(of _all_),'.')=' ' then delete;
|
|
run;
|
|
|
|
%if &mpeloadstop>0 %then %do;
|
|
%if %symexist(SYSPRINTTOLOG) %then %let logloc=&SYSPRINTTOLOG;
|
|
%else %let logloc=%qsysfunc(getoption(LOG));
|
|
%put redirecting log output to capture return message;
|
|
%put currentloc=&logloc;
|
|
filename tmp temp;
|
|
proc printto log=tmp;run;
|
|
data _null_;
|
|
&droplist;
|
|
infile "&csv_dir/&fname" dsd dlm="&dlm" lrecl=32767 firstobs=2
|
|
missover termstr=&termstr;
|
|
attrib &attrib ;
|
|
input
|
|
%if %scan(%quote(&received_vars),1)=_____DELETE__THIS__RECORD_____
|
|
%then %do;
|
|
_____DELETE__THIS__RECORD_____: $3.
|
|
%end;
|
|
&input;
|
|
if _error_ then stop;
|
|
run;
|
|
/* get log back */
|
|
proc printto log=&logloc;run;
|
|
data _null_; infile tmp; input; putlog _infile_;run;
|
|
/* scan log for invalid data warning */
|
|
data _null_;
|
|
infile tmp;
|
|
input;
|
|
length msg1 msg2 msg3 msg4 msg5 msg url $32767;
|
|
if index(_infile_,'NOTE: Invalid data for') then do;
|
|
msg1=_infile_;
|
|
input;
|
|
msg2=_infile_;
|
|
input;
|
|
msg3=_infile_;
|
|
input;
|
|
msg4=_infile_;
|
|
input;
|
|
msg5=_infile_;
|
|
url=symget('url');
|
|
msg=catx('\n',msg1,msg2,msg3,msg4,msg5,'\n',url);
|
|
call symputx('msg',msg);
|
|
stop;
|
|
end;
|
|
run;
|
|
|
|
%mpe_loadfail(
|
|
status=FAILED
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,reason_txt=%superq(msg)
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%return;
|
|
%end;
|
|
|
|
/* check that the table is unique on PK */
|
|
proc sort data=work.STAGING_DS dupout=work.MPE_DUPS (keep=&pk) nodupkey;
|
|
by &pk;
|
|
run;
|
|
%if %mf_getattrn(work.MPE_DUPS,NLOBS)>0 %then %do;
|
|
%local duplist;
|
|
data _null_;
|
|
set work.mpe_dups;
|
|
%do i=1 %to %sysfunc(countw(&pk));
|
|
%let iWord=%scan(&pk,&i);
|
|
call symputx('duplist',symget('duplist')!!
|
|
" &iWord="!!cats(&iWord));
|
|
%end;
|
|
run;
|
|
%let msg=This upload contains duplicates on the Primary Key columns %trim(
|
|
)(&pk) \n Please remove the duplicates and try again. %trim(
|
|
)\n &duplist \n ;
|
|
%mp_abort(msg=%superq(msg),mac=mpe_loader.sas);
|
|
%return;
|
|
%end;
|
|
|
|
%if &syscc gt 4 %then %do;
|
|
%let msg=SYSCC=&syscc prior to post edit hook (%superq(syserrortext));
|
|
%mpe_loadfail(
|
|
status=FAILED - &syscc
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,reason_txt=%superq(msg)
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%return;
|
|
%end;
|
|
|
|
/* If a Complex Excel Upload, needs to have the load ref added to the table */
|
|
%mpe_xlmapvalidate(&mperef,work.staging_ds,&mpelib,&orig_libds)
|
|
|
|
/* Run the Post Edit Hook prior to creation of staging folder */
|
|
%mpe_runhook(POST_EDIT_HOOK)
|
|
|
|
/* stop if err */
|
|
%if &syscc gt 4 %then %do;
|
|
%let msg=ERR in post edit hook (&post_edit_hook);
|
|
%mpe_loadfail(
|
|
status=FAILED - &syscc
|
|
,now=&now
|
|
,mperef=&mperef
|
|
,reason_txt=%quote(&msg)
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%return;
|
|
%end;
|
|
|
|
|
|
/**
|
|
* send to approve process
|
|
*/
|
|
|
|
/* create a dataset key (datetime plus 3 digit random number plus PID) */
|
|
|
|
/* send dataset to approvals subfolder with same name as subfolder */
|
|
libname approval "&mpelocapprovals/&mperef";
|
|
data approval.&mperef;
|
|
set work.staging_ds;
|
|
run;
|
|
proc export data=approval.&mperef
|
|
outfile="&mpelocapprovals/&mperef/&mperef..csv"
|
|
dbms=csv
|
|
replace;
|
|
run;
|
|
|
|
/* update the control dataset with relevant info */
|
|
data append_app;
|
|
if 0 then set &mpelib..mpe_submit;/* get formats */
|
|
call missing (of _all_);
|
|
TABLE_ID="&mperef";
|
|
submit_status_cd='SUBMITTED';
|
|
submitted_by_nm="%mf_getuser()";
|
|
base_lib="&libref";
|
|
base_ds="&ds";
|
|
submitted_on_dttm=&now;
|
|
submitted_reason_txt=symget('submitted_reason_txt');
|
|
input_vars=%mf_getattrn(approval.&mperef,NVARS);
|
|
input_obs=%mf_getattrn(approval.&mperef,NLOBS);
|
|
num_of_approvals_required=&NUM_OF_APPROVALS_REQUIRED;
|
|
num_of_approvals_remaining=&NUM_OF_APPROVALS_REQUIRED;
|
|
reviewed_by_nm='';
|
|
reviewed_on_dttm=.;
|
|
run;
|
|
|
|
%mp_lockanytable(LOCK,lib=&mpelib,ds=mpe_submit,
|
|
ref=%str(&mperef update in &_program),
|
|
ctl_ds=&mpelib..mpe_lockanytable
|
|
)
|
|
proc append base= &mpelib..mpe_submit data=append_app;
|
|
run;
|
|
%mp_lockanytable(UNLOCK,
|
|
lib=&mpelib,ds=mpe_submit,
|
|
ctl_ds=&mpelib..mpe_lockanytable
|
|
)
|
|
|
|
/* send email to REVIEW members */
|
|
%put sending mpe_alerts;
|
|
%mpe_alerts(alert_event=SUBMITTED
|
|
, alert_lib=&libref
|
|
, alert_ds=&ds
|
|
, dsid=&mperef
|
|
)
|
|
/* DISABLE EMAIL FOR NOW
|
|
%let b2=REASON: %quote(&submitted_reason_txt);
|
|
|
|
%local URLNOTES;
|
|
%if %length(¬es)>0 %then %let URLNOTES=%quote(%sysfunc(urlencode(¬es)));
|
|
|
|
%let b3=%str(Click to review / approve: )%trim(
|
|
)%str(http://&_srvname:&_srvport&_url?_PROGRAM=/Web/approvals&)%trim(
|
|
)TABLEID=&dsid%str(&)BASETABLE=&libref..&ds%str(&)NOTES=&URLNOTES;
|
|
|
|
%let b4=%str(Reference ID: &mperef);
|
|
*/
|
|
|
|
%put mpe_loader finishing up with syscc=&syscc;
|
|
%if &syscc le 4 %then %do;
|
|
%local dur;
|
|
data _null_;
|
|
now=symget('now');
|
|
dur=%sysfunc(datetime())-&now;
|
|
call symputx('dur',dur,'l');
|
|
putlog 'Updating mpe_loads with the following query:';
|
|
putlog "update &mpelib..mpe_loads set STATUS='SUCCESS'";
|
|
putlog " , duration=" dur;
|
|
putlog " , processed_dttm=" now;
|
|
putlog " , approvals = '&libref..&ds'";
|
|
putlog " where CSV_DIR='&mperef';";
|
|
run;
|
|
proc sql;
|
|
update &mpelib..mpe_loads set STATUS='SUCCESS'
|
|
, duration=&dur
|
|
, processed_dttm=&now
|
|
, approvals = "&libref..&ds"
|
|
where CSV_DIR="&mperef";
|
|
%end;
|
|
%else %do;
|
|
%mpe_loadfail(
|
|
status="FAILED - &syscc"
|
|
,now=&now
|
|
,approvals=&libref..&ds
|
|
,mperef=&mperef
|
|
,dc_dttmtfmt=&dc_dttmtfmt.
|
|
)
|
|
%return;
|
|
%end;
|
|
|
|
%mend mpe_loader;
|