CREATE TABLE documents (a CLOB)
LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);
INSERT INTO documents VALUES ('abc def');
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;
create or replace
package oracle_map_reduce is
type word_t is record (word varchar2(4000));
type words_t is table of word_t;
type word_cur_t is ref cursor return word_t;
type wordcnt_t is record (word varchar2(4000), count number);
type wordcnts_t is table of wordcnt_t;
function mapper(doc in sys_refcursor, sep in varchar2) return words_t
pipelined parallel_enable (partition doc by any);
function reducer(in_cur in word_cur_t) return wordcnts_t
pipelined parallel_enable (partition in_cur by hash(word))
cluster in_cur by (word);
end;
/
create or replace
package body oracle_map_reduce is
--
-- The mapper is a simple tokenizer that tokenizes the input documents
-- and emits individual words
--
function mapper(doc in sys_refcursor, sep in varchar2) return words_t
pipelined parallel_enable (partition doc by any)
is
document clob;
istart number;
pos number;
len number;
word_rec word_t;
begin
-- for every document
loop
fetch doc into document;
exit when doc%notfound;
istart := 1;
len := length(document);
-- For every word within a document
while (istart <= len) loop
pos := instr(document, sep, istart);
if (pos = 0) then
word_rec.word := substr(document, istart);
pipe row (word_rec);
istart := len + 1;
else
word_rec.word := substr(document, istart, pos - istart);
pipe row (word_rec);
istart := pos + 1;
end if;
end loop; -- end loop for a single document
end loop; -- end loop for all documents
return;
end mapper;
--
-- The reducer emits words and the number of times they're seen
--
function reducer(in_cur in word_cur_t) return wordcnts_t
pipelined parallel_enable (partition in_cur by hash(word))
cluster in_cur by (word)
is
word_count wordcnt_t;
next varchar2(4000);
begin
word_count.count := 0;
loop
fetch in_cur into next;
exit when in_cur%notfound;
if (word_count.word is null) then
word_count.word := next;
word_count.count := word_count.count + 1;
elsif (next <> word_count.word) then
pipe row (word_count);
word_count.word := next;
word_count.count := 1;
else
word_count.count := word_count.count + 1;
end if;
end loop;
if word_count.count <> 0 then
pipe row (word_count);
end if;
return;
end reducer;
end;
/
-- Select statements
select word, count(*)
from (
select value(map_result).word word
from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result)
group by (word);
select *
from table(oracle_map_reduce.reducer(
cursor(select value(map_result).word word
from table(oracle_map_reduce.mapper(
cursor(select a from documents), ' ')) map_result)));
在 Oracle 数据库中实现 MapReduce(2)
内容版权声明:除非注明,否则皆为本站原创文章。
转载注明出处:https://www.heiqu.com/6dfbdc0d9be9cce0fd7f68dc481ca686.html